cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [1/2] git commit: redesign KEYS indexes to avoid read-before-write patch by Sam Tunnicliffe, jbellis, and Philip Jenvey for CASSANDRA-2897
Date Thu, 30 Aug 2012 14:08:03 GMT
Updated Branches:
  refs/heads/trunk 69cedbfca -> 8a1b93d79


redesign KEYS indexes to avoid read-before-write
patch by Sam Tunnicliffe, jbellis, and Philip Jenvey for CASSANDRA-2897


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8a1b93d7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8a1b93d7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8a1b93d7

Branch: refs/heads/trunk
Commit: 8a1b93d79f39b80979f5403de1fac4f7e8d7cb02
Parents: 69cedbf
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Thu Aug 30 09:07:05 2012 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Thu Aug 30 09:07:41 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/db/AbstractColumnContainer.java      |    5 +-
 .../db/AbstractThreadUnsafeSortedColumns.java      |    9 +-
 .../cassandra/db/ArrayBackedSortedColumns.java     |    6 +
 .../apache/cassandra/db/AtomicSortedColumns.java   |   24 +-
 src/java/org/apache/cassandra/db/ColumnFamily.java |    1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   27 +-
 src/java/org/apache/cassandra/db/DeletionInfo.java |   38 --
 .../org/apache/cassandra/db/ISortedColumns.java    |    3 +-
 src/java/org/apache/cassandra/db/Memtable.java     |    9 +-
 src/java/org/apache/cassandra/db/Table.java        |  150 +--------
 .../cassandra/db/ThreadSafeSortedColumns.java      |    5 +-
 .../cassandra/db/TreeMapBackedSortedColumns.java   |   61 +++-
 .../db/compaction/LazilyCompactedRow.java          |   19 +-
 .../db/compaction/ParallelCompactionIterable.java  |    7 +-
 .../cassandra/db/compaction/PrecompactedRow.java   |   21 +-
 .../AbstractSimplePerColumnSecondaryIndex.java     |   22 +-
 .../db/index/PerColumnSecondaryIndex.java          |    9 +-
 .../cassandra/db/index/PerRowSecondaryIndex.java   |   21 +-
 .../cassandra/db/index/SecondaryIndexManager.java  |  264 ++++++++++-----
 .../cassandra/db/index/SecondaryIndexSearcher.java |   10 +
 .../db/index/composites/CompositesSearcher.java    |   12 +
 .../cassandra/db/index/keys/KeysSearcher.java      |   20 ++
 .../io/sstable/SSTableIdentityIterator.java        |    4 +-
 test/unit/org/apache/cassandra/SchemaLoader.java   |   21 ++-
 .../apache/cassandra/config/CFMetaDataTest.java    |   14 +
 .../apache/cassandra/db/ColumnFamilyStoreTest.java |  138 ++++++++
 .../cassandra/db/SecondaryIndexColumnSizeTest.java |   22 +-
 28 files changed, 578 insertions(+), 365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9d09d57..ce7ec50 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.2-dev
  * new metrics (CASSANDRA-4009)
+ * redesign KEYS indexes to avoid read-before-write (CASSANDRA-2897)
  * debug tracing (CASSANDRA-1123)
  * parallelize row cache loading (CASSANDRA-4282)
  * Make compaction, flush JBOD-aware (CASSANDRA-4292)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
index 9ad0fda..ab93c54 100644
--- a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
+++ b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
@@ -26,6 +26,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Functions;
 
 import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.util.IIterableColumns;
 import org.apache.cassandra.utils.Allocator;
@@ -83,9 +84,9 @@ public abstract class AbstractColumnContainer implements IColumnContainer, IIter
         columns.maybeResetDeletionTimes(gcBefore);
     }
 
-    public long addAllWithSizeDelta(AbstractColumnContainer cc, Allocator allocator, Function<IColumn, IColumn> transformation)
+    public long addAllWithSizeDelta(AbstractColumnContainer cc, Allocator allocator, Function<IColumn, IColumn> transformation, SecondaryIndexManager.Updater indexer)
     {
-        return columns.addAllWithSizeDelta(cc.columns, allocator, transformation);
+        return columns.addAllWithSizeDelta(cc.columns, allocator, transformation, indexer);
     }
 
     public void addAll(AbstractColumnContainer cc, Allocator allocator, Function<IColumn, IColumn> transformation)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
index 0e71cb3..31f6877 100644
--- a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 
 import com.google.common.base.Function;
 
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Allocator;
 
@@ -90,14 +91,6 @@ public abstract class AbstractThreadUnsafeSortedColumns implements ISortedColumn
         }
     }
 
-    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
-    {
-        // sizeDelta is only needed by memtable updates which should not be using thread-unsafe containers
-        throw new UnsupportedOperationException();
-    }
-
-    public abstract void addAll(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation);
-
     public boolean isEmpty()
     {
         return size() == 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index 8af667f..8d813a3 100644
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@ -25,6 +25,7 @@ import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Lists;
 
 import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Allocator;
 
@@ -206,6 +207,11 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         return -mid - (result < 0 ? 1 : 2);
     }
 
+    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation, SecondaryIndexManager.Updater indexer)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
     {
         delete(cm.getDeletionInfo());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index f1629ab..83aabea 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -25,6 +25,7 @@ import com.google.common.base.Function;
 import edu.stanford.ppl.concurrent.SnapTreeMap;
 
 import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Allocator;
 
@@ -150,17 +151,17 @@ public class AtomicSortedColumns implements ISortedColumns
         {
             current = ref.get();
             modified = current.cloneMe();
-            modified.addColumn(column, allocator);
+            modified.addColumn(column, allocator, SecondaryIndexManager.nullUpdater);
         }
         while (!ref.compareAndSet(current, modified));
     }
 
     public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
     {
-        addAllWithSizeDelta(cm, allocator, transformation);
+        addAllWithSizeDelta(cm, allocator, transformation, SecondaryIndexManager.nullUpdater);
     }
 
-    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation, SecondaryIndexManager.Updater indexer)
     {
         /*
          * This operation needs to atomicity and isolation. To that end, we
@@ -186,7 +187,7 @@ public class AtomicSortedColumns implements ISortedColumns
 
             for (IColumn column : cm.getSortedColumns())
             {
-                sizeDelta += modified.addColumn(transformation.apply(column), allocator);
+                sizeDelta += modified.addColumn(transformation.apply(column), allocator, indexer);
                 // bail early if we know we've been beaten
                 if (ref.get() != current)
                     continue main_loop;
@@ -335,14 +336,17 @@ public class AtomicSortedColumns implements ISortedColumns
             return new Holder(new SnapTreeMap<ByteBuffer, IColumn>(map.comparator()), deletionInfo);
         }
 
-        long addColumn(IColumn column, Allocator allocator)
+        long addColumn(IColumn column, Allocator allocator, SecondaryIndexManager.Updater indexer)
         {
             ByteBuffer name = column.name();
             while (true)
             {
                 IColumn oldColumn = map.putIfAbsent(name, column);
                 if (oldColumn == null)
+                {
+                    indexer.insert(column);
                     return column.dataSize();
+                }
 
                 if (oldColumn instanceof SuperColumn)
                 {
@@ -353,11 +357,17 @@ public class AtomicSortedColumns implements ISortedColumns
                 }
                 else
                 {
-                    // calculate reconciled col from old (existing) col and new col
                     IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
                     if (map.replace(name, oldColumn, reconciledColumn))
+                    {
+                        // for memtable updates we only care about oldcolumn, reconciledcolumn, but when compacting
+                        // we need to make sure we update indexes no matter the order we merge
+                        if (reconciledColumn == column)
+                            indexer.update(oldColumn, reconciledColumn);
+                        else
+                            indexer.update(column, reconciledColumn);
                         return reconciledColumn.dataSize() - oldColumn.dataSize();
-
+                    }
                     // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
                     // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index c2dd118..d70ee49 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.UUID;
 
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.utils.*;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 464bac6..65e3fd4 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
+
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -712,12 +713,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * param @ key - key for update/insert
      * param @ columnFamily - columnFamily changes
      */
-    public void apply(DecoratedKey key, ColumnFamily columnFamily)
+    public void apply(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer)
     {
         long start = System.nanoTime();
 
         Memtable mt = getMemtableThreadSafe();
-        mt.put(key, columnFamily);
+        mt.put(key, columnFamily, indexer);
         updateRowCache(key, columnFamily);
         metric.writeLatency.addNano(System.nanoTime() - start);
 
@@ -742,44 +743,54 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return cf.getColumnCount() == 0 && !cf.isMarkedForDelete() ? null : cf;
     }
 
+    public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
+    {
+        return removeDeleted(cf, gcBefore, SecondaryIndexManager.nullUpdater);
+    }
+
     /*
      This is complicated because we need to preserve deleted columns, supercolumns, and columnfamilies
      until they have been deleted for at least GC_GRACE_IN_SECONDS.  But, we do not need to preserve
      their contents; just the object itself as a "tombstone" that can be used to repair other
      replicas that do not know about the deletion.
      */
-    public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
+    public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
     {
         if (cf == null)
         {
             return null;
         }
 
-        removeDeletedColumnsOnly(cf, gcBefore);
+        removeDeletedColumnsOnly(cf, gcBefore, indexer);
         return removeDeletedCF(cf, gcBefore);
     }
 
-    public static void removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore)
+    private static void removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
     {
         if (cf.isSuper())
             removeDeletedSuper(cf, gcBefore);
         else
-            removeDeletedStandard(cf, gcBefore);
+            removeDeletedStandard(cf, gcBefore, indexer);
+    }
+
+    public static void removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore)
+    {
+        removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater);
     }
 
-    private static void removeDeletedStandard(ColumnFamily cf, int gcBefore)
+    private static void removeDeletedStandard(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
     {
         Iterator<IColumn> iter = cf.iterator();
         while (iter.hasNext())
         {
             IColumn c = iter.next();
-            ByteBuffer cname = c.name();
             // remove columns if
             // (a) the column itself is gcable or
             // (b) the column is shadowed by a CF tombstone
             if (c.getLocalDeletionTime() < gcBefore || cf.deletionInfo().isDeleted(c))
             {
                 iter.remove();
+                indexer.remove(c);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 480d50a..b63686f 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -27,7 +27,6 @@ import java.util.*;
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterables;
 
-import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.ISSTableSerializer;
@@ -116,43 +115,6 @@ public class DeletionInfo
     }
 
     /**
-     * Return the slice range covered by this deletion info or null is nothing is deleted.
-     */
-    public ColumnSlice[] coveredSlices()
-    {
-        if (isLive())
-            return null;
-
-        if (!topLevel.equals(DeletionTime.LIVE))
-            return ColumnSlice.ALL_COLUMNS_ARRAY;
-
-        List<ColumnSlice> slices = new ArrayList<ColumnSlice>();
-        ColumnSlice current = null;
-        for (RangeTombstone tombstone : ranges)
-        {
-            if (current == null)
-            {
-                current = new ColumnSlice(tombstone.min, tombstone.max);
-            }
-            else if (ranges.comparator().compare(current.finish, tombstone.min) < 0)
-            {
-                // If next if strictly after current, we've finish current slice
-                slices.add(current);
-                current = new ColumnSlice(tombstone.min, tombstone.max);
-            }
-            else if (ranges.comparator().compare(current.finish, tombstone.max) < 0)
-            {
-                // if tombstone end if after current end, extend current
-                current = new ColumnSlice(current.start, tombstone.max);
-            }
-            // otherwise, tombstone is fully included in current already, skip it
-        }
-        if (current != null)
-            slices.add(current);
-        return slices.isEmpty() ? null : slices.toArray(new ColumnSlice[slices.size()]);
-    }
-
-    /**
      * Return a new DeletionInfo correspond to purging every tombstones that
      * are older than {@code gcbefore}.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/ISortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ISortedColumns.java b/src/java/org/apache/cassandra/db/ISortedColumns.java
index f80a249..5ccba19 100644
--- a/src/java/org/apache/cassandra/db/ISortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ISortedColumns.java
@@ -26,6 +26,7 @@ import java.util.SortedSet;
 import com.google.common.base.Function;
 
 import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.util.IIterableColumns;
 import org.apache.cassandra.utils.Allocator;
@@ -74,7 +75,7 @@ public interface ISortedColumns extends IIterableColumns
      *
      *  @return the difference in size seen after merging the given columns
      */
-    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation);
+    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation, SecondaryIndexManager.Updater indexer);
 
     /**
      * Adds the columns without necessarily computing the size delta

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index aa6789d..4424811 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
 
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.github.jamm.MemoryMeter;
@@ -148,10 +149,10 @@ public class Memtable
      * (CFS handles locking to avoid submitting an op
      *  to a flushing memtable.  Any other way is unsafe.)
     */
-    void put(DecoratedKey key, ColumnFamily columnFamily)
+    void put(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer)
     {
         assert !isFrozen; // not 100% foolproof but hell, it's an assert
-        resolve(key, columnFamily);
+        resolve(key, columnFamily, indexer);
     }
 
     public void updateLiveRatio() throws RuntimeException
@@ -222,7 +223,7 @@ public class Memtable
         meterExecutor.submit(runnable);
     }
 
-    private void resolve(DecoratedKey key, ColumnFamily cf)
+    private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer)
     {
         ColumnFamily previous = columnFamilies.get(key);
 
@@ -236,7 +237,7 @@ public class Memtable
                 previous = empty;
         }
 
-        long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction);
+        long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction, indexer);
         currentSize.addAndGet(sizeDelta);
         currentOperations.addAndGet((cf.getColumnCount() == 0)
                                     ? cf.isMarkedForDelete() ? 1 : 0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 5b4520d..66fab5b 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -35,12 +35,12 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * It represents a Keyspace.
@@ -376,68 +376,7 @@ public class Table
                     continue;
                 }
 
-                ColumnSlice[] deletionSlices = null;
-                SortedSet<ByteBuffer> mutatedIndexedColumns = null;
-                if (updateIndexes)
-                {
-                    // If cf has some range deletion, we need to fetch those ranges to know if something indexed was updated
-                    // Note: we could "optimize" that for Keys index, because we know that the columnDef name is directly
-                    // the indexed column name.
-                    deletionSlices = cf.deletionInfo().coveredSlices();
-
-                    for (IColumn updated : cf)
-                    {
-                        if (cfs.indexManager.indexes(updated))
-                        {
-                            if (mutatedIndexedColumns == null)
-                                mutatedIndexedColumns = new TreeSet<ByteBuffer>(cf.getComparator());
-                            mutatedIndexedColumns.add(updated.name());
-                            if (logger.isDebugEnabled())
-                            {
-                                logger.debug(String.format("Mutated indexed column %s value %s",
-                                                           cf.getComparator().getString(updated.name()),
-                                                           ByteBufferUtil.bytesToHex(updated.value())));
-                            }
-                        }
-                    }
-                }
-
-                // Sharding the lock is insufficient to avoid contention when there is a "hot" row, e.g., for
-                // hint writes when a node is down (keyed by target IP).  So it is worth special-casing the
-                // no-index case to avoid the synchronization.
-                if (mutatedIndexedColumns == null && deletionSlices == null)
-                {
-                    cfs.apply(key, cf);
-                    continue;
-                }
-                // else mutatedIndexedColumns != null
-                synchronized (indexLockFor(mutation.key()))
-                {
-                    if (mutatedIndexedColumns == null)
-                        mutatedIndexedColumns = new TreeSet<ByteBuffer>(cf.getComparator());
-
-                    // with the raw data CF, we can just apply every update in any order and let
-                    // read-time resolution throw out obsolete versions, thus avoiding read-before-write.
-                    // but for indexed data we need to make sure that we're not creating index entries
-                    // for obsolete writes.
-                    ColumnFamily oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns, deletionSlices);
-
-                    // We might still have no mutated columns in case it is a deletion but the row had
-                    // no indexed columns
-                    if (mutatedIndexedColumns.isEmpty())
-                    {
-                        cfs.apply(key, cf);
-                        continue;
-                    }
-
-                    logger.debug("Pre-mutation index row is {}", oldIndexedColumns);
-                    ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns);
-
-                    cfs.apply(key, cf);
-
-                    // ignore full index memtables -- we flush those when the "master" one is full
-                    cfs.indexManager.applyIndexUpdates(mutation.key(), cf, mutatedIndexedColumns, oldIndexedColumns);
-                }
+                cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key, true) : SecondaryIndexManager.nullUpdater);
             }
         }
         finally
@@ -446,86 +385,14 @@ public class Table
         }
     }
 
-    private static void ignoreObsoleteMutations(ColumnFamily cf, SortedSet<ByteBuffer> mutatedIndexedColumns, ColumnFamily oldIndexedColumns)
-    {
-        // DO NOT modify the cf object here, it can race w/ the CL write (see https://issues.apache.org/jira/browse/CASSANDRA-2604)
-
-        if (oldIndexedColumns == null)
-            return;
-
-        for (Iterator<ByteBuffer> iter = mutatedIndexedColumns.iterator(); iter.hasNext(); )
-        {
-            ByteBuffer name = iter.next();
-            IColumn newColumn = cf.getColumn(name); // null == row delete or it wouldn't be marked Mutated
-            if (newColumn != null && cf.isMarkedForDelete())
-            {
-                // row is marked for delete, but column was also updated.  if column is timestamped less than
-                // the row tombstone, treat it as if it didn't exist.  Otherwise we don't care about row
-                // tombstone for the purpose of the index update and we can proceed as usual.
-                if (cf.deletionInfo().isDeleted(newColumn))
-                {
-                    // don't remove from the cf object; that can race w/ CommitLog write.  Leaving it is harmless.
-                    newColumn = null;
-                }
-            }
-            IColumn oldColumn = oldIndexedColumns.getColumn(name);
-
-            // deletions are irrelevant to the index unless we're changing state from live -> deleted, i.e.,
-            // just updating w/ a newer tombstone doesn't matter
-            boolean bothDeleted = (newColumn == null || newColumn.isMarkedForDelete())
-                                  && (oldColumn == null || oldColumn.isMarkedForDelete());
-            // obsolete means either the row or the column timestamp we're applying is older than existing data
-            boolean obsoleteRowTombstone = newColumn == null && oldColumn != null && !cf.deletionInfo().isDeleted(oldColumn);
-            boolean obsoleteColumn = newColumn != null && (oldIndexedColumns.deletionInfo().isDeleted(newColumn)
-                                                           || (oldColumn != null && oldColumn.reconcile(newColumn) == oldColumn));
-
-            if (bothDeleted || obsoleteRowTombstone || obsoleteColumn)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("skipping index update for obsolete mutation of " + cf.getComparator().getString(name));
-                iter.remove();
-                oldIndexedColumns.remove(name);
-            }
-        }
-    }
-
-    private static ColumnFamily readCurrentIndexedColumns(DecoratedKey key, ColumnFamilyStore cfs, SortedSet<ByteBuffer> mutatedIndexedColumns, ColumnSlice[] deletionSlices)
-    {
-        // Note: we could only query names not covered by the slices
-        QueryPath path = new QueryPath(cfs.getColumnFamilyName());
-        ColumnFamily cf = ColumnFamily.create(cfs.metadata);
-
-        if (mutatedIndexedColumns != null)
-            cf.resolve(cfs.getColumnFamily(QueryFilter.getNamesFilter(key, path, mutatedIndexedColumns)));
-
-        if (deletionSlices != null)
-        {
-            SliceQueryPager pager = new SliceQueryPager(cfs, key, deletionSlices);
-            while (pager.hasNext())
-            {
-                ColumnFamily cf2 = pager.next();
-                cf.delete(cf2);
-                for (IColumn column : cf2)
-                {
-                    if (cfs.indexManager.indexes(column))
-                    {
-                        cf.addColumn(column);
-                        mutatedIndexedColumns.add(column.name());
-                    }
-                }
-            }
-        }
-        return cf;
-    }
-
     public AbstractReplicationStrategy getReplicationStrategy()
     {
         return replicationStrategy;
     }
 
     /**
-     * @param key      row to index
-     * @param cfs      ColumnFamily to index row in
+     * @param key row to index
+     * @param cfs ColumnFamily to index row in
      * @param idxNames columns to index, in comparator order
      */
     public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames)
@@ -533,13 +400,14 @@ public class Table
         if (logger.isDebugEnabled())
             logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key));
 
+        Collection<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
+
         switchLock.readLock().lock();
         try
         {
             // Our index lock is per-row, but we don't want to hold writes for too long, so for large rows
             // we release the lock between pages
             SliceQueryPager pager = new SliceQueryPager(cfs, key, ColumnSlice.ALL_COLUMNS_ARRAY);
-
             while (pager.hasNext())
             {
                 synchronized (cfs.table.indexLockFor(key.key))
@@ -548,10 +416,10 @@ public class Table
                     ColumnFamily cf2 = cf.cloneMeShallow();
                     for (IColumn column : cf)
                     {
-                        if (cfs.indexManager.indexes(column.name(), idxNames))
+                        if (cfs.indexManager.indexes(column.name(), indexes))
                             cf2.addColumn(column);
                     }
-                    cfs.indexManager.applyIndexUpdates(key.key, cf2, cf2.getColumnNames(), null);
+                    cfs.indexManager.indexRow(key.key, cf2);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
index 0a32b1c..6aa19f0 100644
--- a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import com.google.common.base.Function;
 
 import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Allocator;
 
@@ -125,11 +126,11 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i
      */
     public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
     {
-        addAllWithSizeDelta(cm, allocator, transformation);
+        addAllWithSizeDelta(cm, allocator, transformation, null);
     }
 
     @Override
-    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation, SecondaryIndexManager.Updater indexer)
     {
         delete(cm.getDeletionInfo());
         long sizeDelta = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
index 96e1e5d..c4ec52f 100644
--- a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
@@ -27,6 +27,7 @@ import java.util.TreeMap;
 import com.google.common.base.Function;
 
 import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Allocator;
 
@@ -82,11 +83,16 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
         return false;
     }
 
+    public void addColumn(IColumn column, Allocator allocator)
+    {
+        addColumn(column, allocator, SecondaryIndexManager.nullUpdater);
+    }
+
     /*
      * If we find an old column that has the same name
      * the ask it to resolve itself else add the new column
     */
-    public void addColumn(IColumn column, Allocator allocator)
+    public long addColumn(IColumn column, Allocator allocator, SecondaryIndexManager.Updater indexer)
     {
         ByteBuffer name = column.name();
         // this is a slightly unusual way to structure this; a more natural way is shown in ThreadSafeSortedColumns,
@@ -94,34 +100,51 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
         // which saves the extra "get" in the no-conflict case [for both normal and super columns],
         // in exchange for a re-put in the SuperColumn case.
         IColumn oldColumn = map.put(name, column);
-        if (oldColumn != null)
+        if (oldColumn == null)
+            return column.dataSize();
+
+        if (oldColumn instanceof SuperColumn)
+        {
+            assert column instanceof SuperColumn;
+            long previousSize = oldColumn.dataSize();
+            // since oldColumn is where we've been accumulating results, it's usually going to be faster to
+            // add the new one to the old, then place old back in the Map, rather than copy the old contents
+            // into the new Map entry.
+            ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
+            map.put(name, oldColumn);
+            return oldColumn.dataSize() - previousSize;
+        }
+        else
         {
-            if (oldColumn instanceof SuperColumn)
-            {
-                assert column instanceof SuperColumn;
-                // since oldColumn is where we've been accumulating results, it's usually going to be faster to
-                // add the new one to the old, then place old back in the Map, rather than copy the old contents
-                // into the new Map entry.
-                ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
-                map.put(name,  oldColumn);
-            }
+            // calculate reconciled col from old (existing) col and new col
+            IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
+            map.put(name, reconciledColumn);
+            // for memtable updates we only care about oldcolumn, reconciledcolumn, but when compacting
+            // we need to make sure we update indexes no matter the order we merge
+            if (reconciledColumn == column)
+                indexer.update(oldColumn, reconciledColumn);
             else
-            {
-                // calculate reconciled col from old (existing) col and new col
-                IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
-                map.put(name, reconciledColumn);
-            }
+                indexer.update(column, reconciledColumn);
+            return reconciledColumn.dataSize() - oldColumn.dataSize();
         }
     }
 
+    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation, SecondaryIndexManager.Updater indexer)
+    {
+        delete(cm.getDeletionInfo());
+        for (IColumn column : cm.getSortedColumns())
+            addColumn(transformation.apply(column), allocator, indexer);
+
+        // we don't use this for memtables, so we don't bother computing size
+        return Long.MIN_VALUE;
+    }
+
     /**
      * We need to go through each column in the column container and resolve it before adding
      */
     public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
     {
-        delete(cm.getDeletionInfo());
-        for (IColumn column : cm.getSortedColumns())
-            addColumn(transformation.apply(column), allocator);
+        addAllWithSizeDelta(cm, allocator, transformation, SecondaryIndexManager.nullUpdater);
     }
 
     public boolean replace(IColumn oldColumn, IColumn newColumn)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 2c1f02a..e7b99c4 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -23,6 +23,7 @@ import java.security.MessageDigest;
 import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.base.Functions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterators;
 import org.slf4j.Logger;
@@ -31,10 +32,12 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.HeapAllocator;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.StreamingHistogram;
 
@@ -63,6 +66,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
     private boolean closed;
     private ColumnIndex.Builder indexBuilder;
     private ColumnIndex columnsIndex;
+    private final SecondaryIndexManager.Updater indexer;
 
     public LazilyCompactedRow(CompactionController controller, List<? extends ICountableColumnIterator> rows)
     {
@@ -70,6 +74,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         this.rows = rows;
         this.controller = controller;
         this.shouldPurge = controller.shouldPurge(key);
+        indexer = controller.cfs.indexManager.updaterFor(key, false);
 
         for (OnDiskAtomIterator row : rows)
         {
@@ -224,8 +229,13 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
 
     private class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom>
     {
+        // all columns reduced together will have the same name, so there will only be one column
+        // in the container; we just want to leverage the conflict resolution code from CF
         ColumnFamily container = emptyColumnFamily.cloneMeShallow();
+
+        // tombstone reference; will be reconciled w/ column during getReduced
         RangeTombstone tombstone;
+
         long serializedSize = 4; // int for column count
         int columns = 0;
         long maxTimestampSeen = Long.MIN_VALUE;
@@ -234,9 +244,16 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         public void reduce(OnDiskAtom current)
         {
             if (current instanceof RangeTombstone)
+            {
                 tombstone = (RangeTombstone)current;
+            }
             else
-                container.addColumn((IColumn)current);
+            {
+                IColumn column = (IColumn) current;
+                container.addColumn(column);
+                if (container.getColumn(column.name()) != column)
+                    indexer.remove(column);
+            }
         }
 
         protected OnDiskAtom getReduced()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index f696ad4..56fce20 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.locks.Condition;
 
+import com.google.common.base.Functions;
 import com.google.common.collect.AbstractIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +35,7 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.utils.*;
 
@@ -215,7 +217,8 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                     else
                     {
                         // addAll is ok even if cf is an ArrayBackedSortedColumns
-                        cf.addAll(thisCF, HeapAllocator.instance);
+                        SecondaryIndexManager.Updater indexer = controller.cfs.indexManager.updaterFor(row.key, false);
+                        cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.<IColumn>identity(), indexer);
                     }
                 }
 
@@ -308,7 +311,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                         else
                         {
                             logger.debug("parallel eager deserialize from " + iter.getPath());
-                            queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns())));
+                            queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns(TreeMapBackedSortedColumns.factory()))));
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index ab52405..7d2c65f 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -22,7 +22,10 @@ import java.io.IOException;
 import java.security.MessageDigest;
 import java.util.List;
 
+import com.google.common.base.Functions;
+
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -43,6 +46,8 @@ public class PrecompactedRow extends AbstractCompactedRow
         compactedCf = cf;
     }
 
+
+
     public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, CompactionController controller, ColumnFamily cf)
     {
         assert key != null;
@@ -77,7 +82,9 @@ public class PrecompactedRow extends AbstractCompactedRow
     public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
     {
         // See comment in preceding method
-        ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf, shouldPurge ? controller.gcBefore : Integer.MIN_VALUE);
+        ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
+                                                                 shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
+                                                                 controller.cfs.indexManager.updaterFor(key, false));
         if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
             CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
         return compacted;
@@ -86,19 +93,22 @@ public class PrecompactedRow extends AbstractCompactedRow
     public PrecompactedRow(CompactionController controller, List<SSTableIdentityIterator> rows)
     {
         this(rows.get(0).getKey(),
-             removeDeletedAndOldShards(rows.get(0).getKey(), controller, merge(rows)));
+             removeDeletedAndOldShards(rows.get(0).getKey(), controller, merge(rows, controller)));
     }
 
-    private static ColumnFamily merge(List<SSTableIdentityIterator> rows)
+    private static ColumnFamily merge(List<SSTableIdentityIterator> rows, CompactionController controller)
     {
         assert !rows.isEmpty();
         ColumnFamily cf = null;
+        SecondaryIndexManager.Updater indexer = null;
         for (SSTableIdentityIterator row : rows)
         {
             ColumnFamily thisCF;
             try
             {
-                thisCF = row.getColumnFamilyWithColumns();
+                // use a map for the first once since that will be the one we merge into
+                ISortedColumns.Factory factory = cf == null ? TreeMapBackedSortedColumns.factory() : ArrayBackedSortedColumns.factory();
+                thisCF = row.getColumnFamilyWithColumns(factory);
             }
             catch (IOException e)
             {
@@ -108,11 +118,12 @@ public class PrecompactedRow extends AbstractCompactedRow
             if (cf == null)
             {
                 cf = thisCF;
+                indexer = controller.cfs.indexManager.updaterFor(row.getKey(), false); // only init indexer once
             }
             else
             {
                 // addAll is ok even if cf is an ArrayBackedSortedColumns
-                cf.addAll(thisCF, HeapAllocator.instance);
+                cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.<IColumn>identity(), indexer);
             }
         }
         return cf;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index 24f09ab..3a012f6 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -17,22 +17,14 @@
  */
 package org.apache.cassandra.db.index;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.index.PerColumnSecondaryIndex;
-import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.*;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -81,21 +73,23 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
 
     protected abstract ByteBuffer makeIndexColumnName(ByteBuffer rowKey, IColumn column);
 
-    public void deleteColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn column)
+    public void delete(ByteBuffer rowKey, IColumn column)
     {
         if (column.isMarkedForDelete())
             return;
 
+        DecoratedKey valueKey = getIndexKeyFor(column.value());
         int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
         ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata);
         cfi.addTombstone(makeIndexColumnName(rowKey, column), localDeletionTime, column.timestamp());
-        indexCfs.apply(valueKey, cfi);
+        indexCfs.apply(valueKey, cfi, SecondaryIndexManager.nullUpdater);
         if (logger.isDebugEnabled())
             logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, cfi);
     }
 
-    public void insertColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn column)
+    public void insert(ByteBuffer rowKey, IColumn column)
     {
+        DecoratedKey valueKey = getIndexKeyFor(column.value());
         ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata);
         ByteBuffer name = makeIndexColumnName(rowKey, column);
         if (column instanceof ExpiringColumn)
@@ -110,12 +104,12 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
         if (logger.isDebugEnabled())
             logger.debug("applying index row {} in {}", indexCfs.metadata.getKeyValidator().getString(valueKey.key), cfi);
 
-        indexCfs.apply(valueKey, cfi);
+        indexCfs.apply(valueKey, cfi, SecondaryIndexManager.nullUpdater);
     }
 
-    public void updateColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col)
+    public void update(ByteBuffer rowKey, IColumn col)
     {
-        insertColumn(valueKey, rowKey, col);
+        insert(rowKey, col);
     }
 
     public void removeIndex(ByteBuffer columnName)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
index 2d8286e..01c6cd7 100644
--- a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
@@ -33,29 +33,26 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
     /**
      * Delete a column from the index
      *
-     * @param valueKey the column value which is used as the index key
      * @param rowKey the underlying row key which is indexed
      * @param col all the column info
      */
-    public abstract void deleteColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col);
+    public abstract void delete(ByteBuffer rowKey, IColumn col);
 
     /**
      * insert a column to the index
      *
-     * @param valueKey the column value which is used as the index key
      * @param rowKey the underlying row key which is indexed
      * @param col all the column info
      */
-    public abstract void insertColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col);
+    public abstract void insert(ByteBuffer rowKey, IColumn col);
 
     /**
      * update a column from the index
      *
-     * @param valueKey the column value which is used as the index key
      * @param rowKey the underlying row key which is indexed
      * @param col all the column info
      */
-    public abstract void updateColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col);
+    public abstract void update(ByteBuffer rowKey, IColumn col);
 
     public String getNameForSystemTable(ByteBuffer column)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
index f2a0f60..96252c0 100644
--- a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
@@ -34,27 +34,26 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 public abstract class PerRowSecondaryIndex extends SecondaryIndex
 {
     /**
-     * Removes obsolete index entries and creates new ones for the given row key
-     * and mutated columns.
+     * Index the given row for new index creation.  @param cf will represent the entire row.
      *
      * @param rowKey the row key
      * @param cf the current rows data
-     * @param mutatedIndexedColumns the set of columns that were changed or added
-     * @param oldIndexedColumns the columns which were deleted
      */
-    public abstract void applyIndexUpdates(ByteBuffer rowKey,
-                                           ColumnFamily cf,
-                                           SortedSet<ByteBuffer> mutatedIndexedColumns,
-                                           ColumnFamily oldIndexedColumns);
+    public abstract void index(ByteBuffer rowKey, ColumnFamily cf);
+
+    /**
+     * Index the given row
+     *
+     * @param rowKey the row key
+     */
+    public abstract void index(ByteBuffer rowKey);
 
     /**
      * cleans up deleted columns from cassandra cleanup compaction
      *
      * @param key
-     * @param indexedColumnsInRow
      */
-    public abstract void deleteFromIndex(DecoratedKey key, List<IColumn> indexedColumnsInRow);
-
+    public abstract void delete(DecoratedKey key);
 
     @Override
     public String getNameForSystemTable(ByteBuffer columnName)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 7c87941..70a9164 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -34,12 +34,10 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.LocalToken;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.thrift.Column;
 import org.apache.cassandra.thrift.IndexExpression;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * Manages all the indexes associated with a given CFS
@@ -49,6 +47,15 @@ public class SecondaryIndexManager
 {
     private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
 
+    public static final Updater nullUpdater = new Updater()
+    {
+        public void insert(IColumn column) { }
+
+        public void update(IColumn oldColumn, IColumn column) { }
+
+        public void remove(IColumn current) { }
+    };
+
     /**
      * Organizes the indexes by column name
      */
@@ -119,7 +126,7 @@ public class SecondaryIndexManager
      * Caller must acquire and release references to the sstables used here.
      *
      * @param sstables the data to build from
-     * @param columns the list of columns to index, ordered by comparator
+     * @param idxNames the list of columns to index, ordered by comparator
      */
     public void maybeBuildSecondaryIndexes(Collection<SSTableReader> sstables, Set<String> idxNames)
     {
@@ -149,14 +156,19 @@ public class SecondaryIndexManager
         logger.info("Index build of " + idxNames + " complete");
     }
 
-    public boolean indexes(ByteBuffer name, Set<String> idxNames)
+    public boolean indexes(ByteBuffer name, Collection<SecondaryIndex> indexes)
     {
-        for (SecondaryIndex index : getIndexesByNames(idxNames))
+        return indexFor(name, indexes) != null;
+    }
+
+    public SecondaryIndex indexFor(ByteBuffer name, Collection<SecondaryIndex> indexes)
+    {
+        for (SecondaryIndex index : indexes)
         {
             if (index.indexes(name))
-                return true;
+                return index;
         }
-        return false;
+        return null;
     }
 
     public boolean indexes(IColumn column)
@@ -166,7 +178,12 @@ public class SecondaryIndexManager
 
     public boolean indexes(ByteBuffer name)
     {
-        return indexes(name, allIndexesNames());
+        return indexes(name, indexesByColumn.values());
+    }
+
+    public SecondaryIndex indexFor(ByteBuffer name)
+    {
+        return indexFor(name, indexesByColumn.values());
     }
 
     /**
@@ -394,128 +411,83 @@ public class SecondaryIndexManager
     }
 
     /**
-     * Removes obsolete index entries and creates new ones for the given row key
-     * and mutated columns.
-     *
-     * For columns whos underlying index type has the isRowLevelIndex() flag set to true this function will
-     * call the
+     * When building an index against existing data, add the given row to the index
      *
-     * @param rowKey the row key
+     * @param key the row key
      * @param cf the current rows data
-     * @param mutatedIndexedColumns the set of columns that were changed or added
-     * @param oldIndexedColumns the columns what were deleted
      */
-    public void applyIndexUpdates(ByteBuffer rowKey,
-                                  ColumnFamily cf,
-                                  SortedSet<ByteBuffer> mutatedIndexedColumns,
-                                  ColumnFamily oldIndexedColumns)
+    public void indexRow(ByteBuffer key, ColumnFamily cf)
     {
-
-        // Identify the columns with PerRowSecondaryIndexes
-        // we need to make sure this is only called once
+        // Update entire row only once per row level index
         Set<Class<? extends SecondaryIndex>> appliedRowLevelIndexes = null;
 
-        // remove the old index entries
-        if (oldIndexedColumns != null)
-        {
-            for (IColumn column : oldIndexedColumns)
-            {
-                //this was previously deleted so should not be in index
-                if (column.isMarkedForDelete())
-                    continue;
-
-                SecondaryIndex index = getIndexForFullColumnName(column.name());
-                if (index == null)
-                {
-                    logger.debug("Looks like index got dropped mid-update.  Skipping");
-                    continue;
-                }
-
-                // Update entire row if we encounter a row level index
-                if (index instanceof PerRowSecondaryIndex)
-                {
-                    if (appliedRowLevelIndexes == null)
-                        appliedRowLevelIndexes = new HashSet<Class<? extends SecondaryIndex>>();
-
-                    if (appliedRowLevelIndexes.add(index.getClass()))
-                        ((PerRowSecondaryIndex)index).applyIndexUpdates(rowKey, cf, mutatedIndexedColumns, oldIndexedColumns);
-                }
-                else
-                {
-                    DecoratedKey valueKey = index.getIndexKeyFor(column.value());
-                    ((PerColumnSecondaryIndex)index).deleteColumn(valueKey, rowKey, column);
-                }
-            }
-        }
-
-        //insert new columns
-        for (ByteBuffer columnName : mutatedIndexedColumns)
+        for (Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
         {
-            IColumn column = cf.getColumn(columnName);
-            if (column == null || column.isMarkedForDelete())
-                continue; // null column == row deletion
-
-            SecondaryIndex index = getIndexForFullColumnName(columnName);
-            if (index == null)
-            {
-                logger.debug("index on {} removed; skipping remove-old for {}", columnName, ByteBufferUtil.bytesToHex(rowKey));
-                continue;
-            }
+            SecondaryIndex index = entry.getValue();
 
-            // Update entire row if we encounter a row level index
             if (index instanceof PerRowSecondaryIndex)
             {
                 if (appliedRowLevelIndexes == null)
                     appliedRowLevelIndexes = new HashSet<Class<? extends SecondaryIndex>>();
 
                 if (appliedRowLevelIndexes.add(index.getClass()))
-                    ((PerRowSecondaryIndex)index).applyIndexUpdates(rowKey, cf, mutatedIndexedColumns, oldIndexedColumns);
+                    ((PerRowSecondaryIndex)index).index(key, cf);
             }
             else
             {
-                DecoratedKey valueKey = index.getIndexKeyFor(column.value());
+                IColumn column = cf.getColumn(entry.getKey());
+                if (column == null)
+                    continue;
 
-                ((PerColumnSecondaryIndex)index).insertColumn(valueKey, rowKey, column);
+                ((PerColumnSecondaryIndex) index).insert(key, column);
             }
         }
     }
 
     /**
-     * Delete all columns from all indexes for this row
+     * Delete all columns from all indexes for this row.  For when cleanup rips a row out entirely.
+     *
      * @param key the row key
      * @param indexedColumnsInRow all column names in row
      */
     public void deleteFromIndexes(DecoratedKey key, List<IColumn> indexedColumnsInRow)
     {
-
-        // Identify the columns with isRowLevelIndex == true
-        // we need to make sure this is only called once
+        // Update entire row only once per row level index
         Set<Class<? extends SecondaryIndex>> cleanedRowLevelIndexes = null;
 
         for (IColumn column : indexedColumnsInRow)
         {
             SecondaryIndex index = indexesByColumn.get(column.name());
-
             if (index == null)
                 continue;
 
-            //Update entire row if we encounter a row level index
             if (index instanceof PerRowSecondaryIndex)
             {
                 if (cleanedRowLevelIndexes == null)
                     cleanedRowLevelIndexes = new HashSet<Class<? extends SecondaryIndex>>();
 
                 if (cleanedRowLevelIndexes.add(index.getClass()))
-                    ((PerRowSecondaryIndex)index).deleteFromIndex(key, indexedColumnsInRow);
+                    ((PerRowSecondaryIndex)index).delete(key);
             }
             else
             {
-                DecoratedKey valueKey = index.getIndexKeyFor(column.value());
-                ((PerColumnSecondaryIndex) index).deleteColumn(valueKey, key.key, column);
+                ((PerColumnSecondaryIndex) index).delete(key.key, column);
             }
         }
     }
 
+    /**
+     * This helper acts as a closure around the indexManager
+     * and row key to ensure that down in Memtable's ColumnFamily implementation, the index
+     * can get updated. Note: only a CF backed by AtomicSortedColumns implements this behaviour
+     * fully, other types simply ignore the index updater.
+     */
+    public Updater updaterFor(final DecoratedKey key, boolean includeRowIndexes)
+    {
+        return (includeRowIndexes && !rowLevelIndexMap.isEmpty())
+               ? new MixedIndexUpdater(key)
+               : indexesByColumn.isEmpty() ? nullUpdater : new PerColumnIndexUpdater(key);
+    }
 
     /**
      * Get a list of IndexSearchers from the union of expression index types
@@ -578,7 +550,7 @@ public class SecondaryIndexManager
         return indexSearchers.get(0).search(clause, range, maxResults, dataFilter, maxIsColumns);
     }
 
-    private Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
+    public Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
     {
         List<SecondaryIndex> result = new ArrayList<SecondaryIndex>();
         for (SecondaryIndex index : indexesByColumn.values())
@@ -606,4 +578,132 @@ public class SecondaryIndexManager
         SecondaryIndex index = getIndexForColumn(column.name);
         return index != null ? index.validate(column) : true;
     }
+
+    public static interface Updater
+    {
+        public void insert(IColumn column);
+
+        public void update(IColumn oldColumn, IColumn column);
+
+        public void remove(IColumn current);
+    }
+
+    private class PerColumnIndexUpdater implements Updater
+    {
+        private final DecoratedKey key;
+
+        public PerColumnIndexUpdater(DecoratedKey key)
+        {
+            this.key = key;
+        }
+
+        public void insert(IColumn column)
+        {
+            if (column.isMarkedForDelete())
+                return;
+
+            SecondaryIndex index = indexFor(column.name());
+            if (index == null)
+                return;
+
+            ((PerColumnSecondaryIndex) index).insert(key.key, column);
+        }
+
+        public void update(IColumn oldColumn, IColumn column)
+        {
+            if (column.isMarkedForDelete())
+                return;
+
+            SecondaryIndex index = indexFor(column.name());
+            if (index == null)
+                return;
+
+            ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
+            ((PerColumnSecondaryIndex) index).insert(key.key, column);
+        }
+
+        public void remove(IColumn column)
+        {
+            if (column.isMarkedForDelete())
+                return;
+
+            SecondaryIndex index = indexFor(column.name());
+            if (index == null)
+                return;
+
+            ((PerColumnSecondaryIndex) index).delete(key.key, column);
+        }
+    }
+
+    private class MixedIndexUpdater implements Updater
+    {
+        private final DecoratedKey key;
+        Set<Class<? extends SecondaryIndex>> appliedRowLevelIndexes = new HashSet<Class<? extends SecondaryIndex>>();
+
+        public MixedIndexUpdater(DecoratedKey key)
+        {
+            this.key = key;
+        }
+
+        public void insert(IColumn column)
+        {
+            if (column.isMarkedForDelete())
+                return;
+
+            SecondaryIndex index = indexFor(column.name());
+            if (index == null)
+                return;
+
+            if (index instanceof  PerColumnSecondaryIndex)
+            {
+                ((PerColumnSecondaryIndex) index).insert(key.key, column);
+            }
+            else
+            {
+                if (appliedRowLevelIndexes.add(index.getClass()))
+                    ((PerRowSecondaryIndex) index).index(key.key);
+            }
+        }
+
+        public void update(IColumn oldColumn, IColumn column)
+        {
+            if (column.isMarkedForDelete())
+                return;
+
+            SecondaryIndex index = indexFor(column.name());
+            if (index == null)
+                return;
+
+            if (index instanceof  PerColumnSecondaryIndex)
+            {
+                ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
+                ((PerColumnSecondaryIndex) index).insert(key.key, column);
+            }
+            else
+            {
+                if (appliedRowLevelIndexes.add(index.getClass()))
+                    ((PerRowSecondaryIndex) index).index(key.key);
+            }
+        }
+
+        public void remove(IColumn column)
+        {
+            if (column.isMarkedForDelete())
+                return;
+
+            SecondaryIndex index = indexFor(column.name());
+            if (index == null)
+                return;
+
+            if (index instanceof  PerColumnSecondaryIndex)
+            {
+                ((PerColumnSecondaryIndex) index).delete(key.key, column);
+            }
+            else
+            {
+                if (appliedRowLevelIndexes.add(index.getClass()))
+                    ((PerRowSecondaryIndex) index).index(key.key);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index bbd1c83..1938773 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -44,4 +44,14 @@ public abstract class SecondaryIndexSearcher
      * @return true this index is able to handle given clauses.
      */
     public abstract boolean isIndexing(List<IndexExpression> clause);
+    
+    protected boolean isIndexValueStale(ColumnFamily liveData, ByteBuffer indexedColumnName, ByteBuffer indexedValue)
+    {
+        IColumn liveColumn = liveData.getColumn(indexedColumnName);
+        if (null == liveColumn || liveColumn.isMarkedForDelete())
+            return true;
+        
+        ByteBuffer liveValue = liveColumn.value();
+        return 0 != liveData.metadata().getValueValidator(indexedColumnName).compare(indexedValue, liveValue);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 6ffb63e..9b37ae3 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -23,6 +23,7 @@ import java.util.*;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.index.PerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
@@ -300,6 +301,17 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                         ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, path, dataFilter));
                         if (newData != null)
                         {
+                            ByteBuffer baseColumnName = builder.copy().add(primary.column_name).build();
+                            ByteBuffer indexedValue = indexKey.key;
+
+                            if (isIndexValueStale(newData, baseColumnName, indexedValue))
+                            {
+                                // delete the index entry w/ its own timestamp
+                                IColumn dummyColumn = new Column(baseColumnName, indexedValue, column.timestamp());
+                                ((PerColumnSecondaryIndex) index).delete(dk.key, dummyColumn);
+                                continue;
+                            }
+
                             if (!filter.isSatisfiedBy(newData, builder))
                                 continue;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index f18e041..4bd2b39 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -23,6 +23,7 @@ import java.util.*;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.index.PerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
@@ -31,6 +32,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.HeapAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -196,6 +198,24 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         // While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
                         if (data == null)
                             data = ColumnFamily.create(baseCfs.metadata);
+
+                        // as in CFS.filter - extend the filter to ensure we include the columns 
+                        // from the index expressions, just in case they weren't included in the initialFilter
+                        IFilter extraFilter = filter.getExtraFilter(data);
+                        if (extraFilter != null)
+                        {
+                            ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, path, extraFilter));
+                            if (cf != null)
+                                data.addAll(cf, HeapAllocator.instance);
+                        }
+                        
+                        if (isIndexValueStale(data, primary.column_name, indexKey.key))
+                        {
+                            // delete the index entry w/ its own timestamp
+                            IColumn dummyColumn = new Column(primary.column_name, indexKey.key, column.timestamp());
+                            ((PerColumnSecondaryIndex)index).delete(dk.key, dummyColumn);
+                            continue;
+                        }
                         return new Row(dk, data);
                     }
                  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 4710689..d9aabfb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -227,10 +227,10 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
         }
     }
 
-    public ColumnFamily getColumnFamilyWithColumns() throws IOException
+    public ColumnFamily getColumnFamilyWithColumns(ISortedColumns.Factory containerFactory) throws IOException
     {
         assert inputWithTracker.getBytesRead() == headerSize();
-        ColumnFamily cf = columnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory(), false);
+        ColumnFamily cf = columnFamily.cloneMeShallow(containerFactory, false);
         // since we already read column count, just pass that value and continue deserialization
         columnFamily.serializer.deserializeColumnsFromSSTable(inputWithTracker, cf, columnCount, flag, expireBefore, dataVersion);
         if (validateColumns)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 8c425a4..5189835 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.index.composites.CompositesIndex;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.compress.CompressionParameters;
@@ -224,7 +225,8 @@ public class SchemaLoader
                                            standardCFMD(ks2, "Standard3", withOldCfIds),
                                            superCFMD(ks2, "Super3", bytes, withOldCfIds),
                                            superCFMD(ks2, "Super4", TimeUUIDType.instance, withOldCfIds),
-                                           indexCFMD(ks2, "Indexed1", true, withOldCfIds)));
+                                           indexCFMD(ks2, "Indexed1", true, withOldCfIds),
+                                           compositeIndexCFMD(ks2, "Indexed2", true, withOldCfIds)));
 
         // Keyspace 3
         schema.add(KSMetaData.testMetadata(ks3,
@@ -336,6 +338,23 @@ public class SchemaLoader
                         put(cName, new ColumnDefinition(cName, LongType.instance, keys, null, withIdxType ? ByteBufferUtil.bytesToHex(cName) : null, null));
                     }});
     }
+    private static CFMetaData compositeIndexCFMD(String ksName, String cfName, final Boolean withIdxType, boolean withOldCfIds) throws ConfigurationException
+    {
+        final Map<String, String> idxOpts = Collections.singletonMap(CompositesIndex.PREFIX_SIZE_OPTION, "1");
+        final CompositeType composite = CompositeType.getInstance(Arrays.asList(new AbstractType<?>[]{UTF8Type.instance, UTF8Type.instance})); 
+        return new CFMetaData(ksName,
+                cfName,
+                ColumnFamilyType.Standard,
+                composite,
+                null)
+               .columnMetadata(new HashMap<ByteBuffer, ColumnDefinition>()
+                {{
+                   ByteBuffer cName = ByteBuffer.wrap("col1".getBytes(Charsets.UTF_8));
+                   IndexType idxType = withIdxType ? IndexType.COMPOSITES : null;
+                   put(cName, new ColumnDefinition(cName, UTF8Type.instance, idxType, idxOpts, withIdxType ? "col1_idx" : null, 1));
+                }});
+    }
+    
     private static CFMetaData jdbcCFMD(String ksName, String cfName, AbstractType comp, boolean withOldCfIds)
     {
         CFMetaData cfmd = new CFMetaData(ksName, cfName, ColumnFamilyType.Standard, comp, null).defaultValidator(comp);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
index 4a93775..8ca83ce 100644
--- a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
+++ b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
@@ -18,9 +18,11 @@
  */
 package org.apache.cassandra.config;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -123,6 +125,18 @@ public class CFMetaDataTest extends SchemaLoader
     {
         DecoratedKey k = StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(cfm.ksName));
 
+        // This is a nasty hack to work around the fact that non-null componentIndex 
+        // are only used by CQL (so far) so we don't expose them through thrift
+        // There is a CFM with componentIndex defined in Keyspace2 which is used by 
+        // ColumnFamilyStoreTest to verify index repair (CASSANDRA-2897)
+        for (Map.Entry<ByteBuffer, ColumnDefinition> cMeta: cfm.column_metadata.entrySet())
+        {
+            // Non-null componentIndex are only used by CQL (so far) so we don't expose
+            // them through thrift
+            if (cMeta.getValue().componentIndex != null)
+                cfm.column_metadata.remove(cMeta.getKey());
+        }
+
         // Test thrift conversion
         assert cfm.equals(CFMetaData.fromThrift(cfm.toThrift())) : String.format("\n%s\n!=\n%s", cfm, CFMetaData.fromThrift(cfm.toThrift()));
 


Mime
View raw message