cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sn...@apache.org
Subject [9/9] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0
Date Wed, 16 Sep 2015 20:05:02 GMT
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9218d745
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9218d745
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9218d745

Branch: refs/heads/cassandra-3.0
Commit: 9218d7456b36d20ebe78bab23594e67d2f0c4a20
Parents: fde97c3 e63dacf
Author: Robert Stupp <snazy@snazy.de>
Authored: Wed Sep 16 22:00:32 2015 +0200
Committer: Robert Stupp <snazy@snazy.de>
Committed: Wed Sep 16 22:00:32 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cache/AutoSavingCache.java | 195 +++++++++++--------
 .../org/apache/cassandra/cache/CacheKey.java    |  14 +-
 .../apache/cassandra/cache/CounterCacheKey.java |  26 +--
 .../org/apache/cassandra/cache/KeyCacheKey.java |  19 +-
 .../org/apache/cassandra/cache/OHCProvider.java |  39 +++-
 .../org/apache/cassandra/cache/RowCacheKey.java |  34 ++--
 .../org/apache/cassandra/config/CFMetaData.java |  10 +
 .../cassandra/config/DatabaseDescriptor.java    |  19 +-
 .../org/apache/cassandra/config/Schema.java     |  58 +++++-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  83 +++-----
 src/java/org/apache/cassandra/db/Keyspace.java  |   6 -
 .../org/apache/cassandra/db/RowIndexEntry.java  |   2 +-
 .../db/SinglePartitionReadCommand.java          |   3 +-
 .../io/sstable/format/SSTableReader.java        |  10 +-
 .../io/sstable/format/big/BigTableReader.java   |   2 +-
 .../apache/cassandra/service/CacheService.java  |  54 +++--
 .../cassandra/service/CassandraDaemon.java      |  39 +++-
 .../cassandra/service/StorageService.java       |  29 +--
 .../org/apache/cassandra/utils/FBUtilities.java |  16 ++
 .../cassandra/cache/AutoSavingCacheTest.java    |   5 +-
 .../cassandra/cache/CacheProviderTest.java      |  24 ++-
 .../apache/cassandra/cql3/KeyCacheCqlTest.java  | 101 ++++++++++
 .../apache/cassandra/db/CounterCacheTest.java   |  68 ++++++-
 .../org/apache/cassandra/db/KeyCacheTest.java   |   2 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |  41 +++-
 26 files changed, 612 insertions(+), 288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b213260,96ec0fa..8e3e947
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,18 -1,15 +1,19 @@@
 -2.2.2
 +3.0.0-rc1
 + * Improve MV schema representation (CASSANDRA-9921)
 + * Add flag to enable/disable coordinator batchlog for MV writes (CASSANDRA-10230)
 + * Update cqlsh COPY for new internal driver serialization interface (CASSANDRA-10318)
 + * Give index implementations more control over rebuild operations (CASSANDRA-10312)
 + * Update index file format (CASSANDRA-10314)
 + * Add "shadowable" row tombstones to deal with mv timestamp issues (CASSANDRA-10261)
 + * CFS.loadNewSSTables() broken for pre-3.0 sstables
 + * Cache selected index in read command to reduce lookups (CASSANDRA-10215)
 + * Small optimizations of sstable index serialization (CASSANDRA-10232)
 + * Support for both encrypted and unencrypted native transport connections (CASSANDRA-9590)
 +Merged from 2.2:
   * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761)
 - * Cancel transaction for sstables we wont redistribute index summary
 -   for (CASSANDRA-10270)
 - * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) 
 - * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222)
 - * Fix failure to start with space in directory path on Windows (CASSANDRA-10239)
 - * Fix repair hang when snapshot failed (CASSANDRA-10057)
 - * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks
 -   (CASSANDRA-10199)
 + * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209)
  Merged from 2.1:
+  * Fix cache handling of 2i and base tables (CASSANDRA-10155)
   * Fix NPE in nodetool compactionhistory (CASSANDRA-9758)
   * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410)
   * BATCH statement is broken in cqlsh (CASSANDRA-10272)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cache/AutoSavingCache.java
index ebd2830,3ec9d4e..4558bb7
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@@ -35,7 -42,7 +42,6 @@@ import org.apache.cassandra.db.SystemKe
  import org.apache.cassandra.db.compaction.CompactionInfo;
  import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.db.compaction.OperationType;
--import org.apache.cassandra.db.marshal.BytesType;
  import org.apache.cassandra.io.FSWriteError;
  import org.apache.cassandra.io.util.*;
  import org.apache.cassandra.io.util.ChecksummedRandomAccessReader.CorruptFileException;
@@@ -62,7 -68,15 +68,17 @@@ public class AutoSavingCache<K extends 
      protected final CacheService.CacheType cacheType;
  
      private final CacheSerializer<K, V> cacheLoader;
+ 
+     /*
+      * CASSANDRA-10155 required a format change to fix 2i indexes and caching.
+      * 2.2 is already at version "c" and 3.0 is at "d".
+      *
+      * Since cache versions match exactly and there is no partial fallback just add
+      * a minor version letter.
++     *
++     * Sticking with "d" is fine for 3.0 since it has never been released or used by another version
+      */
 -    private static final String CURRENT_VERSION = "ca";
 +    private static final String CURRENT_VERSION = "d";
  
      private static volatile IStreamFactory streamFactory = new IStreamFactory()
      {
@@@ -130,33 -142,90 +144,90 @@@
          }
      }
  
-     public int loadSaved(ColumnFamilyStore cfs)
+     public ListenableFuture<Integer> loadSavedAsync()
+     {
+         final ListeningExecutorService es = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+         final long start = System.nanoTime();
+ 
+         ListenableFuture<Integer> cacheLoad = es.submit(new Callable<Integer>()
+         {
+             @Override
+             public Integer call() throws Exception
+             {
+                 return loadSaved();
+             }
+         });
+         cacheLoad.addListener(new Runnable() {
+             @Override
+             public void run()
+             {
+                 if (size() > 0)
+                     logger.info("Completed loading ({} ms; {} keys) {} cache",
+                             TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
+                             CacheService.instance.keyCache.size(),
+                             cacheType);
+                 es.shutdown();
+             }
 -        }, MoreExecutors.sameThreadExecutor());
++        }, MoreExecutors.directExecutor());
+ 
+         return cacheLoad;
+     }
+ 
+     public int loadSaved()
      {
          int count = 0;
          long start = System.nanoTime();
  
          // modern format, allows both key and value (so key cache load can be purely sequential)
-         File dataPath = getCacheDataPath(cfs.metadata.cfId, CURRENT_VERSION);
-         File crcPath = getCacheCrcPath(cfs.metadata.cfId, CURRENT_VERSION);
+         File dataPath = getCacheDataPath(CURRENT_VERSION);
+         File crcPath = getCacheCrcPath(CURRENT_VERSION);
          if (dataPath.exists() && crcPath.exists())
          {
 -            DataInputStream in = null;
 +            DataInputStreamPlus in = null;
              try
              {
                  logger.info(String.format("reading saved cache %s", dataPath));
 -                in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length()));
 +                in = new DataInputStreamPlus(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length()));
-                 List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>();
+                 ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<Future<Pair<K, V>>>();
                  while (in.available() > 0)
                  {
-                     Future<Pair<K, V>> entry = cacheLoader.deserialize(in, cfs);
+                     //ksname and cfname are serialized by the serializers in CacheService
+                     //That is delegated there because there are serializer specific conditions
+                     //where a cache key is skipped and not written
+                     String ksname = in.readUTF();
+                     String cfname = in.readUTF();
+ 
+                     ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(ksname, cfname));
+ 
+                     Future<Pair<K, V>> entryFuture = cacheLoader.deserialize(in, cfs);
                      // Key cache entry can return null, if the SSTable doesn't exist.
-                     if (entry == null)
+                     if (entryFuture == null)
                          continue;
-                     futures.add(entry);
+ 
+                     futures.offer(entryFuture);
                      count++;
+ 
+                     /*
+                      * Kind of unwise to accrue an unbounded number of pending futures
+                      * So now there is this loop to keep a bounded number pending.
+                      */
+                     do
+                     {
+                         while (futures.peek() != null && futures.peek().isDone())
+                         {
+                             Future<Pair<K, V>> future = futures.poll();
+                             Pair<K, V> entry = future.get();
+                             if (entry != null && entry.right != null)
+                                 put(entry.left, entry.right);
+                         }
+ 
+                         if (futures.size() > 1000)
+                             Thread.yield();
+                     } while(futures.size() > 1000);
                  }
  
-                 for (Future<Pair<K, V>> future : futures)
+                 Future<Pair<K, V>> future = null;
+                 while ((future = futures.poll()) != null)
                  {
                      Pair<K, V> entry = future.get();
                      if (entry != null && entry.right != null)
@@@ -377,8 -416,8 +418,8 @@@
  
      public interface CacheSerializer<K extends CacheKey, V>
      {
-         void serialize(K key, DataOutputPlus out) throws IOException;
+         void serialize(K key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException;
  
 -        Future<Pair<K, V>> deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException;
 +        Future<Pair<K, V>> deserialize(DataInputPlus in, ColumnFamilyStore cfs) throws IOException;
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/cache/CounterCacheKey.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cache/CounterCacheKey.java
index 00766ee,68856eb..8b173bf
--- a/src/java/org/apache/cassandra/cache/CounterCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/CounterCacheKey.java
@@@ -19,53 -19,30 +19,45 @@@ package org.apache.cassandra.cache
  
  import java.nio.ByteBuffer;
  import java.util.Arrays;
- import java.util.UUID;
  
 -import org.apache.cassandra.db.composites.CellName;
 -import org.apache.cassandra.db.composites.CellNames;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.rows.CellPath;
 +import org.apache.cassandra.db.marshal.CompositeType;
  import org.apache.cassandra.utils.*;
  
- public class CounterCacheKey implements CacheKey
+ public final class CounterCacheKey extends CacheKey
  {
-     private static final long EMPTY_SIZE = ObjectSizes.measure(new CounterCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBuffer.allocate(1)))
-                                            + ObjectSizes.measure(new UUID(0, 0));
 -    private static final long EMPTY_SIZE = ObjectSizes.measure(new CounterCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER, CellNames.simpleDense(ByteBuffer.allocate(1))));
++    private static final long EMPTY_SIZE = ObjectSizes.measure(new CounterCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBuffer.allocate(1)));
  
-     public final UUID cfId;
      public final byte[] partitionKey;
      public final byte[] cellName;
  
-     public CounterCacheKey(UUID cfId, ByteBuffer partitionKey, ByteBuffer cellName)
 -    private CounterCacheKey(Pair<String, String> ksAndCFName, ByteBuffer partitionKey, CellName cellName)
++    public CounterCacheKey(Pair<String, String> ksAndCFName, ByteBuffer partitionKey, ByteBuffer cellName)
      {
-         this.cfId = cfId;
+         super(ksAndCFName);
          this.partitionKey = ByteBufferUtil.getArray(partitionKey);
 -        this.cellName = ByteBufferUtil.getArray(cellName.toByteBuffer());
 +        this.cellName = ByteBufferUtil.getArray(cellName);
      }
  
-     public static CounterCacheKey create(UUID cfId, ByteBuffer partitionKey, Clustering clustering, ColumnDefinition c, CellPath path)
 -    public static CounterCacheKey create(Pair<String, String> ksAndCFName, ByteBuffer partitionKey, CellName cellName)
++    public static CounterCacheKey create(Pair<String, String> ksAndCFName, ByteBuffer partitionKey, Clustering clustering, ColumnDefinition c, CellPath path)
      {
-         return new CounterCacheKey(cfId, partitionKey, makeCellName(clustering, c, path));
 -        return new CounterCacheKey(ksAndCFName, partitionKey, cellName);
++        return new CounterCacheKey(ksAndCFName, partitionKey, makeCellName(clustering, c, path));
 +    }
 +
 +    private static ByteBuffer makeCellName(Clustering clustering, ColumnDefinition c, CellPath path)
 +    {
 +        int cs = clustering.size();
 +        ByteBuffer[] values = new ByteBuffer[cs + 1 + (path == null ? 0 : path.size())];
 +        for (int i = 0; i < cs; i++)
 +            values[i] = clustering.get(i);
 +        values[cs] = c.name.bytes;
 +        if (path != null)
 +            for (int i = 0; i < path.size(); i++)
 +                values[cs + 1 + i] = path.get(i);
 +        return CompositeType.build(values);
      }
  
-     public UUID getCFId()
-     {
-         return cfId;
-     }
- 
      public long unsharedHeapSize()
      {
          return EMPTY_SIZE

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/cache/OHCProvider.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cache/OHCProvider.java
index c6c6bb7,9b1c8cf..8a7bdfc
--- a/src/java/org/apache/cassandra/cache/OHCProvider.java
+++ b/src/java/org/apache/cassandra/cache/OHCProvider.java
@@@ -17,18 -17,22 +17,18 @@@
   */
  package org.apache.cassandra.cache;
  
 -import java.io.DataInput;
 -import java.io.DataOutput;
  import java.io.IOException;
  import java.nio.ByteBuffer;
 -import java.nio.channels.WritableByteChannel;
  import java.util.Iterator;
- import java.util.UUID;
  
 -import com.google.common.base.Function;
 -
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.db.ColumnFamily;
  import org.apache.cassandra.db.TypeSizes;
 -import org.apache.cassandra.io.util.DataOutputPlus;
 -import org.apache.cassandra.io.util.Memory;
 -import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.db.partitions.CachedPartition;
 +import org.apache.cassandra.io.util.DataInputBuffer;
++import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.io.util.DataOutputBufferFixed;
- import org.apache.cassandra.io.util.NIODataInputStream;
 +import org.apache.cassandra.io.util.RebufferingInputStream;
+ import org.apache.cassandra.utils.Pair;
  import org.caffinitas.ohc.OHCache;
  import org.caffinitas.ohc.OHCacheBuilder;
  
@@@ -122,27 -126,29 +122,50 @@@ public class OHCProvider implements Cac
  
      private static class KeySerializer implements org.caffinitas.ohc.CacheSerializer<RowCacheKey>
      {
 -        public void serialize(RowCacheKey rowCacheKey, DataOutput dataOutput) throws IOException
 -        {
 -            dataOutput.writeUTF(rowCacheKey.ksAndCFName.left);
 -            dataOutput.writeUTF(rowCacheKey.ksAndCFName.right);
 -            dataOutput.writeInt(rowCacheKey.key.length);
 -            dataOutput.write(rowCacheKey.key);
 -        }
 -
 -        public RowCacheKey deserialize(DataInput dataInput) throws IOException
 -        {
 -            String ksName = dataInput.readUTF();
 -            String cfName = dataInput.readUTF();
 -            byte[] key = new byte[dataInput.readInt()];
 -            dataInput.readFully(key);
 +        private static KeySerializer instance = new KeySerializer();
 +        public void serialize(RowCacheKey rowCacheKey, ByteBuffer buf)
 +        {
-             buf.putLong(rowCacheKey.cfId.getMostSignificantBits());
-             buf.putLong(rowCacheKey.cfId.getLeastSignificantBits());
++            @SuppressWarnings("resource")
++            DataOutputBuffer dataOutput = new DataOutputBufferFixed(buf);
++            try
++            {
++                dataOutput.writeUTF(rowCacheKey.ksAndCFName.left);
++                dataOutput.writeUTF(rowCacheKey.ksAndCFName.right);
++            }
++            catch (IOException e)
++            {
++                throw new RuntimeException(e);
++            }
 +            buf.putInt(rowCacheKey.key.length);
 +            buf.put(rowCacheKey.key);
 +        }
 +
 +        public RowCacheKey deserialize(ByteBuffer buf)
 +        {
-             long msb = buf.getLong();
-             long lsb = buf.getLong();
++            @SuppressWarnings("resource")
++            DataInputBuffer dataInput = new DataInputBuffer(buf, false);
++            String ksName = null;
++            String cfName = null;
++            try
++            {
++                ksName = dataInput.readUTF();
++                cfName = dataInput.readUTF();
++            }
++            catch (IOException e)
++            {
++                throw new RuntimeException(e);
++            }
 +            byte[] key = new byte[buf.getInt()];
 +            buf.get(key);
-             return new RowCacheKey(new UUID(msb, lsb), key);
+             return new RowCacheKey(Pair.create(ksName, cfName), key);
          }
  
          public int serializedSize(RowCacheKey rowCacheKey)
          {
-             return 20 + rowCacheKey.key.length;
 -            return TypeSizes.NATIVE.sizeof(rowCacheKey.ksAndCFName.left)
 -                    + TypeSizes.NATIVE.sizeof(rowCacheKey.ksAndCFName.right)
++            return TypeSizes.sizeof(rowCacheKey.ksAndCFName.left)
++                    + TypeSizes.sizeof(rowCacheKey.ksAndCFName.right)
+                     + 4
+                     + rowCacheKey.key.length;
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/CFMetaData.java
index 929a34a,348eb89..69bf6bf
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@@ -43,15 -40,20 +43,18 @@@ import org.apache.cassandra.cql3.QueryP
  import org.apache.cassandra.cql3.statements.CFStatement;
  import org.apache.cassandra.cql3.statements.CreateTableStatement;
  import org.apache.cassandra.db.*;
 -import org.apache.cassandra.db.compaction.*;
 -import org.apache.cassandra.db.composites.*;
 -import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
  import org.apache.cassandra.db.marshal.*;
 -import org.apache.cassandra.exceptions.*;
 -import org.apache.cassandra.io.compress.CompressionParameters;
 -import org.apache.cassandra.io.compress.LZ4Compressor;
 -import org.apache.cassandra.io.sstable.format.Version;
 -import org.apache.cassandra.io.util.FileDataInput;
 -import org.apache.cassandra.schema.LegacySchemaTables;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.schema.*;
 +import org.apache.cassandra.utils.*;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.Pair;
+ import org.apache.cassandra.utils.UUIDGen;
  import org.github.jamm.Unmetered;
  
  /**
@@@ -73,29 -168,30 +76,31 @@@ public final class CFMetaDat
      public final UUID cfId;                           // internal id, never exposed to user
      public final String ksName;                       // name of keyspace
      public final String cfName;                       // name of this column family
+     public final Pair<String, String> ksAndCFName;
+     public final byte[] ksAndCFBytes;
 -    public final ColumnFamilyType cfType;             // standard, super
 -    public volatile CellNameType comparator;          // bytes, long, timeuuid, utf8, etc.
 -
 -    //OPTIONAL
 -    private volatile String comment = "";
 -    private volatile double readRepairChance = DEFAULT_READ_REPAIR_CHANCE;
 -    private volatile double dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE;
 -    private volatile int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS;
 -    private volatile AbstractType<?> defaultValidator = BytesType.instance;
 +
 +    private final ImmutableSet<Flag> flags;
 +    private final boolean isDense;
 +    private final boolean isCompound;
 +    private final boolean isSuper;
 +    private final boolean isCounter;
 +    private final boolean isView;
 +
 +    private final boolean isIndex;
 +
 +    public volatile ClusteringComparator comparator;  // bytes, long, timeuuid, utf8, etc. This is built directly from clusteringColumns
 +    public final IPartitioner partitioner;            // partitioner the table uses
 +
 +    private final Serializers serializers;
 +
 +    // non-final, for now
 +    public volatile TableParams params = TableParams.DEFAULT;
 +
      private volatile AbstractType<?> keyValidator = BytesType.instance;
 -    private volatile int minCompactionThreshold = DEFAULT_MIN_COMPACTION_THRESHOLD;
 -    private volatile int maxCompactionThreshold = DEFAULT_MAX_COMPACTION_THRESHOLD;
 -    private volatile Double bloomFilterFpChance = null;
 -    private volatile CachingOptions caching = DEFAULT_CACHING_STRATEGY;
 -    private volatile int minIndexInterval = DEFAULT_MIN_INDEX_INTERVAL;
 -    private volatile int maxIndexInterval = DEFAULT_MAX_INDEX_INTERVAL;
 -    private volatile int memtableFlushPeriod = 0;
 -    private volatile int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
 -    private volatile SpeculativeRetry speculativeRetry = DEFAULT_SPECULATIVE_RETRY;
 -    private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>();
 -    private volatile Map<String, TriggerDefinition> triggers = new HashMap<>();
 -    private volatile boolean isPurged = false;
 +    private volatile Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
 +    private volatile Triggers triggers = Triggers.none();
 +    private volatile Indexes indexes = Indexes.none();
 +
      /*
       * All CQL3 columns definition are stored in the columnMetadata map.
       * On top of that, we keep separated collection of each kind of definition, to
@@@ -103,278 -199,101 +108,283 @@@
       * clustering key ones, those list are ordered by the "component index" of the
       * elements.
       */
 -    public static final String DEFAULT_KEY_ALIAS = "key";
 -    public static final String DEFAULT_COLUMN_ALIAS = "column";
 -    public static final String DEFAULT_VALUE_ALIAS = "value";
 -
 -    // We call dense a CF for which each component of the comparator is a clustering column, i.e. no
 -    // component is used to store a regular column names. In other words, non-composite static "thrift"
 -    // and CQL3 CF are *not* dense.
 -    private volatile Boolean isDense; // null means "we don't know and need to infer from other data"
 -
 -    private volatile Map<ByteBuffer, ColumnDefinition> columnMetadata = new HashMap<>();
 +    private final Map<ByteBuffer, ColumnDefinition> columnMetadata = new ConcurrentHashMap<>(); // not on any hot path
      private volatile List<ColumnDefinition> partitionKeyColumns;  // Always of size keyValidator.componentsCount, null padded if necessary
      private volatile List<ColumnDefinition> clusteringColumns;    // Of size comparator.componentsCount or comparator.componentsCount -1, null padded if necessary
 -    private volatile SortedSet<ColumnDefinition> regularColumns;  // We use a sorted set so iteration is of predictable order (for SELECT for instance)
 -    private volatile SortedSet<ColumnDefinition> staticColumns;   // Same as above
 -    private volatile ColumnDefinition compactValueColumn;
 +    private volatile PartitionColumns partitionColumns;           // Always non-PK, non-clustering columns
  
 -    public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS;
 -    public volatile Map<String, String> compactionStrategyOptions = new HashMap<>();
 -
 -    public volatile CompressionParameters compressionParameters = new CompressionParameters(null);
 -
 -    // attribute setters that return the modified CFMetaData instance
 -    public CFMetaData comment(String prop) {comment = Strings.nullToEmpty(prop); return this;}
 -    public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;}
 -    public CFMetaData dcLocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;}
 -    public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;}
 -    public CFMetaData defaultValidator(AbstractType<?> prop) {defaultValidator = prop; return this;}
 -    public CFMetaData keyValidator(AbstractType<?> prop) {keyValidator = prop; return this;}
 -    public CFMetaData minCompactionThreshold(int prop) {minCompactionThreshold = prop; return this;}
 -    public CFMetaData maxCompactionThreshold(int prop) {maxCompactionThreshold = prop; return this;}
 -    public CFMetaData compactionStrategyClass(Class<? extends AbstractCompactionStrategy> prop) {compactionStrategyClass = prop; return this;}
 -    public CFMetaData compactionStrategyOptions(Map<String, String> prop) {compactionStrategyOptions = prop; return this;}
 -    public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;}
 -    public CFMetaData bloomFilterFpChance(double prop) {bloomFilterFpChance = prop; return this;}
 -    public CFMetaData caching(CachingOptions prop) {caching = prop; return this;}
 -    public CFMetaData minIndexInterval(int prop) {minIndexInterval = prop; return this;}
 -    public CFMetaData maxIndexInterval(int prop) {maxIndexInterval = prop; return this;}
 -    public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;}
 -    public CFMetaData defaultTimeToLive(int prop) {defaultTimeToLive = prop; return this;}
 -    public CFMetaData speculativeRetry(SpeculativeRetry prop) {speculativeRetry = prop; return this;}
 -    public CFMetaData droppedColumns(Map<ColumnIdentifier, Long> cols) {droppedColumns = cols; return this;}
 -    public CFMetaData triggers(Map<String, TriggerDefinition> prop) {triggers = prop; return this;}
 -    public CFMetaData isDense(Boolean prop) {isDense = prop; return this;}
 +    // For dense tables, this alias the single non-PK column the table contains (since it can only have one). We keep
 +    // that as convenience to access that column more easily (but we could replace calls by partitionColumns().iterator().next()
 +    // for those tables in practice).
 +    private volatile ColumnDefinition compactValueColumn;
  
 -    /**
 -     * Create new ColumnFamily metadata with generated random ID.
 -     * When loading from existing schema, use CFMetaData
 -     *
 -     * @param keyspace keyspace name
 -     * @param name column family name
 -     * @param comp default comparator
 +    /*
 +     * All of these methods will go away once CFMetaData becomes completely immutable.
       */
 -    public CFMetaData(String keyspace, String name, ColumnFamilyType type, CellNameType comp)
 +    public CFMetaData params(TableParams params)
 +    {
 +        this.params = params;
 +        return this;
 +    }
 +
 +    public CFMetaData bloomFilterFpChance(double prop)
 +    {
 +        params = TableParams.builder(params).bloomFilterFpChance(prop).build();
 +        return this;
 +    }
 +
 +    public CFMetaData caching(CachingParams prop)
 +    {
 +        params = TableParams.builder(params).caching(prop).build();
 +        return this;
 +    }
 +
 +    public CFMetaData comment(String prop)
 +    {
 +        params = TableParams.builder(params).comment(prop).build();
 +        return this;
 +    }
 +
 +    public CFMetaData compaction(CompactionParams prop)
 +    {
 +        params = TableParams.builder(params).compaction(prop).build();
 +        return this;
 +    }
 +
 +    public CFMetaData compression(CompressionParams prop)
 +    {
 +        params = TableParams.builder(params).compression(prop).build();
 +        return this;
 +    }
 +
 +    public CFMetaData dcLocalReadRepairChance(double prop)
 +    {
 +        params = TableParams.builder(params).dcLocalReadRepairChance(prop).build();
 +        return this;
 +    }
 +
 +    public CFMetaData defaultTimeToLive(int prop)
 +    {
 +        params = TableParams.builder(params).defaultTimeToLive(prop).build();
 +        return this;
 +    }
 +
 +    public CFMetaData gcGraceSeconds(int prop)
 +    {
 +        params = TableParams.builder(params).gcGraceSeconds(prop).build();
 +        return this;
 +    }
 +
 +    public CFMetaData maxIndexInterval(int prop)
 +    {
 +        params = TableParams.builder(params).maxIndexInterval(prop).build();
 +        return this;
 +    }
 +
 +    public CFMetaData memtableFlushPeriod(int prop)
 +    {
 +        params = TableParams.builder(params).memtableFlushPeriodInMs(prop).build();
 +        return this;
 +    }
 +
 +    public CFMetaData minIndexInterval(int prop)
 +    {
 +        params = TableParams.builder(params).minIndexInterval(prop).build();
 +        return this;
 +    }
 +
 +    public CFMetaData readRepairChance(double prop)
 +    {
 +        params = TableParams.builder(params).readRepairChance(prop).build();
 +        return this;
 +    }
 +
 +    public CFMetaData speculativeRetry(SpeculativeRetryParam prop)
 +    {
 +        params = TableParams.builder(params).speculativeRetry(prop).build();
 +        return this;
 +    }
 +
 +    public CFMetaData extensions(Map<String, ByteBuffer> extensions)
 +    {
 +        params = TableParams.builder(params).extensions(extensions).build();
 +        return this;
 +    }
 +
 +    public CFMetaData droppedColumns(Map<ByteBuffer, DroppedColumn> cols)
 +    {
 +        droppedColumns = cols;
 +        return this;
 +    }
 +
 +    public CFMetaData triggers(Triggers prop)
      {
 -        this(keyspace, name, type, comp, UUIDGen.getTimeUUID());
 +        triggers = prop;
 +        return this;
      }
  
 -    public CFMetaData(String keyspace, String name, ColumnFamilyType type, CellNameType comp, UUID id)
 +    public CFMetaData indexes(Indexes indexes)
      {
 -        cfId = id;
 -        ksName = keyspace;
 -        cfName = name;
 +        this.indexes = indexes;
 +        return this;
 +    }
 +
 +    private CFMetaData(String keyspace,
 +                       String name,
 +                       UUID cfId,
 +                       boolean isSuper,
 +                       boolean isCounter,
 +                       boolean isDense,
 +                       boolean isCompound,
 +                       boolean isView,
 +                       List<ColumnDefinition> partitionKeyColumns,
 +                       List<ColumnDefinition> clusteringColumns,
 +                       PartitionColumns partitionColumns,
 +                       IPartitioner partitioner)
 +    {
 +        this.cfId = cfId;
 +        this.ksName = keyspace;
 +        this.cfName = name;
+         ksAndCFName = Pair.create(keyspace, name);
+         byte[] ksBytes = FBUtilities.toWriteUTFBytes(ksName);
+         byte[] cfBytes = FBUtilities.toWriteUTFBytes(cfName);
+         ksAndCFBytes = Arrays.copyOf(ksBytes, ksBytes.length + cfBytes.length);
+         System.arraycopy(cfBytes, 0, ksAndCFBytes, ksBytes.length, cfBytes.length);
  
 -        cfType = type;
 -        comparator = comp;
 +        this.isDense = isDense;
 +        this.isCompound = isCompound;
 +        this.isSuper = isSuper;
 +        this.isCounter = isCounter;
 +        this.isView = isView;
 +
 +        EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
 +        if (isSuper)
 +            flags.add(Flag.SUPER);
 +        if (isCounter)
 +            flags.add(Flag.COUNTER);
 +        if (isDense)
 +            flags.add(Flag.DENSE);
 +        if (isCompound)
 +            flags.add(Flag.COMPOUND);
 +        this.flags = Sets.immutableEnumSet(flags);
 +
 +        isIndex = cfName.contains(".");
 +
 +        assert partitioner != null;
 +        this.partitioner = partitioner;
 +
 +        // A compact table should always have a clustering
 +        assert isCQLTable() || !clusteringColumns.isEmpty() : String.format("For table %s.%s, isDense=%b, isCompound=%b, clustering=%s", ksName, cfName, isDense, isCompound, clusteringColumns);
 +
 +        this.partitionKeyColumns = partitionKeyColumns;
 +        this.clusteringColumns = clusteringColumns;
 +        this.partitionColumns = partitionColumns;
 +
 +        this.serializers = new Serializers(this);
 +        rebuild();
 +    }
 +
 +    // This rebuild informations that are intrinsically duplicate of the table definition but
 +    // are kept because they are often useful in a different format.
 +    private void rebuild()
 +    {
 +        this.comparator = new ClusteringComparator(extractTypes(clusteringColumns));
 +
 +        this.columnMetadata.clear();
 +        for (ColumnDefinition def : partitionKeyColumns)
 +            this.columnMetadata.put(def.name.bytes, def);
 +        for (ColumnDefinition def : clusteringColumns)
 +        {
 +            this.columnMetadata.put(def.name.bytes, def);
 +            def.type.checkComparable();
 +        }
 +        for (ColumnDefinition def : partitionColumns)
 +            this.columnMetadata.put(def.name.bytes, def);
 +
 +        List<AbstractType<?>> keyTypes = extractTypes(partitionKeyColumns);
 +        this.keyValidator = keyTypes.size() == 1 ? keyTypes.get(0) : CompositeType.getInstance(keyTypes);
 +
 +        if (isCompactTable())
 +            this.compactValueColumn = CompactTables.getCompactValueColumn(partitionColumns, isSuper());
 +    }
 +
 +    public Indexes getIndexes()
 +    {
 +        return indexes;
      }
  
 -    public static CFMetaData denseCFMetaData(String keyspace, String name, AbstractType<?> comp, AbstractType<?> subcc)
 +    public static CFMetaData create(String ksName,
 +                                    String name,
 +                                    UUID cfId,
 +                                    boolean isDense,
 +                                    boolean isCompound,
 +                                    boolean isSuper,
 +                                    boolean isCounter,
 +                                    boolean isView,
 +                                    List<ColumnDefinition> columns,
 +                                    IPartitioner partitioner)
      {
 -        CellNameType cellNameType = CellNames.fromAbstractType(makeRawAbstractType(comp, subcc), true);
 -        return new CFMetaData(keyspace, name, subcc == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, cellNameType);
 +        List<ColumnDefinition> partitions = new ArrayList<>();
 +        List<ColumnDefinition> clusterings = new ArrayList<>();
 +        PartitionColumns.Builder builder = PartitionColumns.builder();
 +
 +        for (ColumnDefinition column : columns)
 +        {
 +            switch (column.kind)
 +            {
 +                case PARTITION_KEY:
 +                    partitions.add(column);
 +                    break;
 +                case CLUSTERING:
 +                    clusterings.add(column);
 +                    break;
 +                default:
 +                    builder.add(column);
 +                    break;
 +            }
 +        }
 +
 +        Collections.sort(partitions);
 +        Collections.sort(clusterings);
 +
 +        return new CFMetaData(ksName,
 +                              name,
 +                              cfId,
 +                              isSuper,
 +                              isCounter,
 +                              isDense,
 +                              isCompound,
 +                              isView,
 +                              partitions,
 +                              clusterings,
 +                              builder.build(),
 +                              partitioner);
      }
  
 -    public static CFMetaData sparseCFMetaData(String keyspace, String name, AbstractType<?> comp)
 +    private static List<AbstractType<?>> extractTypes(List<ColumnDefinition> clusteringColumns)
      {
 -        CellNameType cellNameType = CellNames.fromAbstractType(comp, false);
 -        return new CFMetaData(keyspace, name, ColumnFamilyType.Standard, cellNameType);
 +        List<AbstractType<?>> types = new ArrayList<>(clusteringColumns.size());
 +        for (ColumnDefinition def : clusteringColumns)
 +            types.add(def.type);
 +        return types;
      }
  
 -    public static CFMetaData denseCFMetaData(String keyspace, String name, AbstractType<?> comp)
 +    public Set<Flag> flags()
      {
 -        return denseCFMetaData(keyspace, name, comp, null);
 +        return flags;
      }
  
 -    public static AbstractType<?> makeRawAbstractType(AbstractType<?> comparator, AbstractType<?> subComparator)
 +    /**
 +     * There is a couple of places in the code where we need a CFMetaData object and don't have one readily available
 +     * and know that only the keyspace and name matter. This creates such "fake" metadata. Use only if you know what
 +     * you're doing.
 +     */
 +    public static CFMetaData createFake(String keyspace, String name)
      {
 -        return subComparator == null ? comparator : CompositeType.getInstance(Arrays.asList(comparator, subComparator));
 +        return CFMetaData.Builder.create(keyspace, name).addPartitionKey("key", BytesType.instance).build();
      }
  
 -    public Map<String, TriggerDefinition> getTriggers()
 +    public Triggers getTriggers()
      {
          return triggers;
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 7553c92,c459b5d..7c062a1
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -1538,25 -1480,11 +1538,16 @@@ public class DatabaseDescripto
          return conf.max_hint_window_in_ms;
      }
  
 +    public static File getHintsDirectory()
 +    {
 +        return new File(conf.hints_directory);
 +    }
 +
-     public static File getSerializedCachePath(String ksName,
-                                               String cfName,
-                                               UUID cfId,
-                                               CacheService.CacheType cacheType,
-                                               String version,
-                                               String extension)
-     {
-         StringBuilder builder = new StringBuilder();
-         builder.append(ksName).append('-');
-         builder.append(cfName).append('-');
-         builder.append(ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(cfId))).append('-');
-         builder.append(cacheType);
-         builder.append((version == null ? "" : "-" + version + "." + extension));
-         return new File(conf.saved_caches_directory, builder.toString());
+     public static File getSerializedCachePath(CacheService.CacheType cacheType, String version, String extension)
+     {
+         String name = cacheType.toString()
+                 + (version == null ? "" : "-" + version + "." + extension);
+         return new File(conf.saved_caches_directory, name);
      }
  
      public static int getDynamicUpdateInterval()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Schema.java
index bcde978,00c9358..bcfac1b
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@@ -27,16 -26,17 +27,17 @@@ import com.google.common.collect.Sets
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.cql3.functions.Functions;
 -import org.apache.cassandra.cql3.functions.UDAggregate;
 -import org.apache.cassandra.cql3.functions.UDFunction;
 -import org.apache.cassandra.db.*;
 +import org.apache.cassandra.cql3.functions.*;
 +import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.marshal.AbstractType;
  import org.apache.cassandra.db.marshal.UserType;
 -import org.apache.cassandra.db.index.SecondaryIndex;
++import org.apache.cassandra.index.Index;
  import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.schema.LegacySchemaTables;
 +import org.apache.cassandra.schema.*;
  import org.apache.cassandra.service.MigrationManager;
  import org.apache.cassandra.utils.ConcurrentBiMap;
  import org.apache.cassandra.utils.Pair;
@@@ -164,6 -156,53 +165,55 @@@ public class Schem
          return keyspaceInstances.get(keyspaceName);
      }
  
+     /**
+      * Retrieve a CFS by name even if that CFS is an index
+      *
+      * An index is identified by looking for '.' in the CF name and separating to find the base table
+      * containing the index
+      * @param ksNameAndCFName
+      * @return The named CFS or null if the keyspace, base table, or index don't exist
+      */
+     public ColumnFamilyStore getColumnFamilyStoreIncludingIndexes(Pair<String, String> ksNameAndCFName) {
+         String ksName = ksNameAndCFName.left;
+         String cfName = ksNameAndCFName.right;
+         Pair<String, String> baseTable;
+ 
+         /*
+          * Split does special case a one character regex, and it looks like it can detect
+          * if you use two characters to escape '.', but it still allocates a useless array.
+          */
+         int indexOfSeparator = cfName.indexOf('.');
+         if (indexOfSeparator > -1)
+             baseTable = Pair.create(ksName, cfName.substring(0, indexOfSeparator));
+         else
+             baseTable = ksNameAndCFName;
+ 
+         UUID cfId = cfIdMap.get(baseTable);
+         if (cfId == null)
+             return null;
+ 
+         Keyspace ks = keyspaceInstances.get(ksName);
+         if (ks == null)
+             return null;
+ 
+         ColumnFamilyStore baseCFS = ks.getColumnFamilyStore(cfId);
+ 
+         //Not an index
+         if (indexOfSeparator == -1)
+             return baseCFS;
+ 
+         if (baseCFS == null)
+             return null;
+ 
 -        SecondaryIndex index = baseCFS.indexManager.getIndexByName(cfName);
++        Index index = baseCFS.indexManager.getIndexByName(cfName.substring(indexOfSeparator + 1, cfName.length()));
+         if (index == null)
+             return null;
+ 
 -        return index.getIndexCfs();
++        //Shouldn't ask for a backing table if there is none so just throw?
++        //Or should it return null?
++        return index.getBackingTable().get();
+     }
+ 
      public ColumnFamilyStore getColumnFamilyStoreInstance(UUID cfId)
      {
          Pair<String, String> pair = cfIdMap.inverse().get(cfId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 0d6d801,a8a8910..cdb9770
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -395,12 -391,8 +393,9 @@@ public class ColumnFamilyStore implemen
              data.addInitialSSTables(sstables);
          }
  
-         if (caching.cacheKeys())
-             CacheService.instance.keyCache.loadSaved(this);
- 
          // compaction strategy should be created after the CFS has been prepared
 -        this.compactionStrategyWrapper = new WrappingCompactionStrategy(this);
 +        this.compactionStrategyManager = new CompactionStrategyManager(this);
 +        this.directories = this.compactionStrategyManager.getDirectories();
  
          if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0)
          {
@@@ -620,47 -610,104 +615,14 @@@
          }
  
          // also clean out any index leftovers.
 -        for (ColumnDefinition def : metadata.allColumns())
 -        {
 -            if (def.isIndexed())
 -            {
 -                CellNameType indexComparator = SecondaryIndex.getIndexComparator(metadata, def);
 -                if (indexComparator != null)
 -                {
 -                    CFMetaData indexMetadata = CFMetaData.newIndexMetadata(metadata, def, indexComparator);
 -                    scrubDataDirectories(indexMetadata);
 -                }
 -            }
 -        }
 -    }
 -
 -    /**
 -     * Replacing compacted sstables is atomic as far as observers of Tracker are concerned, but not on the
 -     * filesystem: first the new sstables are renamed to "live" status (i.e., the tmp marker is removed), then
 -     * their ancestors are removed.
 -     *
 -     * If an unclean shutdown happens at the right time, we can thus end up with both the new ones and their
 -     * ancestors "live" in the system.  This is harmless for normal data, but for counters it can cause overcounts.
 -     *
 -     * To prevent this, we record sstables being compacted in the system keyspace.  If we find unfinished
 -     * compactions, we remove the new ones (since those may be incomplete -- under LCS, we may create multiple
 -     * sstables from any given ancestor).
 -     */
 -    public static void removeUnfinishedCompactionLeftovers(CFMetaData metadata, Map<Integer, UUID> unfinishedCompactions)
 -    {
 -        Directories directories = new Directories(metadata);
 -        Set<Integer> allGenerations = new HashSet<>();
 -        for (Descriptor desc : directories.sstableLister().list().keySet())
 -            allGenerations.add(desc.generation);
 -
 -        // sanity-check unfinishedCompactions
 -        Set<Integer> unfinishedGenerations = unfinishedCompactions.keySet();
 -        if (!allGenerations.containsAll(unfinishedGenerations))
 -        {
 -            HashSet<Integer> missingGenerations = new HashSet<>(unfinishedGenerations);
 -            missingGenerations.removeAll(allGenerations);
 -            logger.debug("Unfinished compactions of {}.{} reference missing sstables of generations {}",
 -                         metadata.ksName, metadata.cfName, missingGenerations);
 -        }
 -
 -        // remove new sstables from compactions that didn't complete, and compute
 -        // set of ancestors that shouldn't exist anymore
 -        Set<Integer> completedAncestors = new HashSet<>();
 -        for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().skipTemporary(true).list().entrySet())
 -        {
 -            Descriptor desc = sstableFiles.getKey();
 -
 -            Set<Integer> ancestors;
 -            try
 -            {
 -                CompactionMetadata compactionMetadata = (CompactionMetadata) desc.getMetadataSerializer().deserialize(desc, MetadataType.COMPACTION);
 -                ancestors = compactionMetadata.ancestors;
 -            }
 -            catch (IOException e)
 -            {
 -                throw new FSReadError(e, desc.filenameFor(Component.STATS));
 -            }
 -            catch (NullPointerException e)
 -            {
 -                throw new FSReadError(e, "Failed to remove unfinished compaction leftovers (file: " + desc.filenameFor(Component.STATS) + ").  See log for details.");
 -            }
 -
 -            if (!ancestors.isEmpty()
 -                && unfinishedGenerations.containsAll(ancestors)
 -                && allGenerations.containsAll(ancestors))
 -            {
 -                // any of the ancestors would work, so we'll just lookup the compaction task ID with the first one
 -                UUID compactionTaskID = unfinishedCompactions.get(ancestors.iterator().next());
 -                assert compactionTaskID != null;
 -                logger.debug("Going to delete unfinished compaction product {}", desc);
 -                SSTable.delete(desc, sstableFiles.getValue());
 -                SystemKeyspace.finishCompaction(compactionTaskID);
 -            }
 -            else
 -            {
 -                completedAncestors.addAll(ancestors);
 -            }
 -        }
 -
 -        // remove old sstables from compactions that did complete
 -        for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
 -        {
 -            Descriptor desc = sstableFiles.getKey();
 -            if (completedAncestors.contains(desc.generation))
 +        for (IndexMetadata index : metadata.getIndexes())
 +            if (!index.isCustom())
              {
 -                // if any of the ancestors were participating in a compaction, finish that compaction
 -                logger.debug("Going to delete leftover compaction ancestor {}", desc);
 -                SSTable.delete(desc, sstableFiles.getValue());
 -                UUID compactionTaskID = unfinishedCompactions.get(desc.generation);
 -                if (compactionTaskID != null)
 -                    SystemKeyspace.finishCompaction(unfinishedCompactions.get(desc.generation));
 +                CFMetaData indexMetadata = CassandraIndex.indexCfsMetadata(metadata, index);
 +                scrubDataDirectories(indexMetadata);
              }
 -        }
      }
  
-     // must be called after all sstables are loaded since row cache merges all row versions
-     public void init()
-     {
-         if (!isRowCacheEnabled())
-             return;
- 
-         long start = System.nanoTime();
- 
-         int cachedRowsRead = CacheService.instance.rowCache.loadSaved(this);
-         if (cachedRowsRead > 0)
-             logger.info("Completed loading ({} ms; {} keys) row cache for {}.{}",
-                         TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
-                         cachedRowsRead,
-                         keyspace.getName(),
-                         name);
-     }
- 
-     public void initCounterCache()
-     {
-         if (!metadata.isCounter() || CacheService.instance.counterCache.getCapacity() == 0)
-             return;
- 
-         long start = System.nanoTime();
- 
-         int cachedShardsRead = CacheService.instance.counterCache.loadSaved(this);
-         if (cachedShardsRead > 0)
-             logger.info("Completed loading ({} ms; {} shards) counter cache for {}.{}",
-                         TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
-                         cachedShardsRead,
-                         keyspace.getName(),
-                         name);
-     }
- 
      /**
       * See #{@code StorageService.loadNewSSTables(String, String)} for more info
       *
@@@ -1167,8 -1226,8 +1129,8 @@@
          if (!isRowCacheEnabled())
              return;
  
-         RowCacheKey cacheKey = new RowCacheKey(metadata.cfId, key);
+         RowCacheKey cacheKey = new RowCacheKey(metadata.ksAndCFName, key);
 -        invalidateCachedRow(cacheKey);
 +        invalidateCachedPartition(cacheKey);
      }
  
      /**
@@@ -1555,9 -2040,9 +1517,9 @@@
               keyIter.hasNext(); )
          {
              RowCacheKey key = keyIter.next();
 -            DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
 +            DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.key));
-             if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges))
+             if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges))
 -                invalidateCachedRow(dk);
 +                invalidateCachedPartition(dk);
          }
  
          if (metadata.isCounter())
@@@ -1566,8 -2051,8 +1528,8 @@@
                   keyIter.hasNext(); )
              {
                  CounterCacheKey key = keyIter.next();
 -                DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
 +                DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.partitionKey));
-                 if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges))
+                 if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges))
                      CacheService.instance.counterCache.remove(key);
              }
          }
@@@ -1768,9 -2501,9 +1730,8 @@@
      {
          if (!isRowCacheEnabled())
              return null;
--
-         IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new RowCacheKey(metadata.cfId, key));
+         IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new RowCacheKey(metadata.ksAndCFName, key));
 -        return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily)cached;
 +        return cached == null || cached instanceof RowCacheSentinel ? null : (CachedPartition)cached;
      }
  
      private void invalidateCaches()
@@@ -1784,37 -2517,37 +1745,36 @@@
      /**
       * @return true if @param key is contained in the row cache
       */
 -    public boolean containsCachedRow(DecoratedKey key)
 +    public boolean containsCachedParition(DecoratedKey key)
      {
-         return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.cfId, key));
+         return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.ksAndCFName, key));
      }
  
 -    public void invalidateCachedRow(RowCacheKey key)
 +    public void invalidateCachedPartition(RowCacheKey key)
      {
          CacheService.instance.rowCache.remove(key);
      }
  
 -    public void invalidateCachedRow(DecoratedKey key)
 +    public void invalidateCachedPartition(DecoratedKey key)
      {
--        UUID cfId = Schema.instance.getId(keyspace.getName(), this.name);
--        if (cfId == null)
--            return; // secondary index
++        if (!Schema.instance.hasCF(metadata.ksAndCFName))
++            return; //2i don't cache rows
  
-         invalidateCachedPartition(new RowCacheKey(cfId, key));
 -        invalidateCachedRow(new RowCacheKey(metadata.ksAndCFName, key));
++        invalidateCachedPartition(new RowCacheKey(metadata.ksAndCFName, key));
      }
  
 -    public ClockAndCount getCachedCounter(ByteBuffer partitionKey, CellName cellName)
 +    public ClockAndCount getCachedCounter(ByteBuffer partitionKey, Clustering clustering, ColumnDefinition column, CellPath path)
      {
          if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
              return null;
-         return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, clustering, column, path));
 -        return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, cellName));
++        return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, clustering, column, path));
      }
  
 -    public void putCachedCounter(ByteBuffer partitionKey, CellName cellName, ClockAndCount clockAndCount)
 +    public void putCachedCounter(ByteBuffer partitionKey, Clustering clustering, ColumnDefinition column, CellPath path, ClockAndCount clockAndCount)
      {
          if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
              return;
-         CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, clustering, column, path), clockAndCount);
 -        CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, cellName), clockAndCount);
++        CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, clustering, column, path), clockAndCount);
      }
  
      public void forceMajorCompaction() throws InterruptedException, ExecutionException
@@@ -2237,7 -2934,17 +2197,20 @@@
  
      public boolean isRowCacheEnabled()
      {
-         return metadata.params.caching.cacheRows() && CacheService.instance.rowCache.getCapacity() > 0;
 -        return metadata.getCaching().rowCache.isEnabled() && CacheService.instance.rowCache.getCapacity() > 0;
++
++        boolean retval = metadata.params.caching.cacheRows() && CacheService.instance.rowCache.getCapacity() > 0;
++        assert(!retval || !isIndex());
++        return retval;
+     }
+ 
+     public boolean isCounterCacheEnabled()
+     {
+         return metadata.isCounter() && CacheService.instance.counterCache.getCapacity() > 0;
+     }
+ 
+     public boolean isKeyCacheEnabled()
+     {
 -        return metadata.getCaching().keyCache.isEnabled() && CacheService.instance.keyCache.getCapacity() > 0;
++        return metadata.params.caching.cacheKeys() && CacheService.instance.keyCache.getCapacity() > 0;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/RowIndexEntry.java
index 43dc80c,f9d8c6d..198e890
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@@ -227,23 -149,15 +227,23 @@@ public class RowIndexEntry<T> implement
              }
          }
  
 -        public static void skip(DataInput in) throws IOException
 +        // Reads only the data 'position' of the index entry and returns it. Note that this left 'in' in the middle
 +        // of reading an entry, so this is only useful if you know what you are doing and in most case 'deserialize'
 +        // should be used instead.
 +        public static long readPosition(DataInputPlus in, Version version) throws IOException
          {
 -            in.readLong();
 -            skipPromotedIndex(in);
 +            return version.storeRows() ? in.readUnsignedVInt() : in.readLong();
          }
  
 -        private static void skipPromotedIndex(DataInput in) throws IOException
 +        public static void skip(DataInputPlus in, Version version) throws IOException
          {
 -            int size = in.readInt();
 +            readPosition(in, version);
 +            skipPromotedIndex(in, version);
 +        }
 +
-         public static void skipPromotedIndex(DataInputPlus in, Version version) throws IOException
++        private static void skipPromotedIndex(DataInputPlus in, Version version) throws IOException
 +        {
 +            int size = version.storeRows() ? (int)in.readUnsignedVInt() : in.readInt();
              if (size <= 0)
                  return;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index c08ef6a,0000000..cd01748
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -1,527 -1,0 +1,526 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import org.apache.cassandra.cache.IRowCacheEntry;
 +import org.apache.cassandra.cache.RowCacheKey;
 +import org.apache.cassandra.cache.RowCacheSentinel;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 +import org.apache.cassandra.exceptions.RequestExecutionException;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.metrics.TableMetrics;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.service.CacheService;
 +import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.service.pager.*;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +
 +/**
 + * A read command that selects a (part of a) single partition.
 + */
 +public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter> extends ReadCommand
 +{
 +    protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
 +
 +    private final DecoratedKey partitionKey;
 +    private final F clusteringIndexFilter;
 +
 +    protected SinglePartitionReadCommand(boolean isDigest,
 +                                         int digestVersion,
 +                                         boolean isForThrift,
 +                                         CFMetaData metadata,
 +                                         int nowInSec,
 +                                         ColumnFilter columnFilter,
 +                                         RowFilter rowFilter,
 +                                         DataLimits limits,
 +                                         DecoratedKey partitionKey,
 +                                         F clusteringIndexFilter)
 +    {
 +        super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
 +        assert partitionKey.getPartitioner() == metadata.partitioner;
 +        this.partitionKey = partitionKey;
 +        this.clusteringIndexFilter = clusteringIndexFilter;
 +    }
 +
 +    /**
 +     * Creates a new read command on a single partition.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param columnFilter the column filter to use for the query.
 +     * @param rowFilter the row filter to use for the query.
 +     * @param limits the limits to use for the query.
 +     * @param partitionKey the partition key for the partition to query.
 +     * @param clusteringIndexFilter the clustering index filter to use for the query.
 +     *
 +     * @return a newly created read command.
 +     */
 +    public static SinglePartitionReadCommand<?> create(CFMetaData metadata,
 +                                                       int nowInSec,
 +                                                       ColumnFilter columnFilter,
 +                                                       RowFilter rowFilter,
 +                                                       DataLimits limits,
 +                                                       DecoratedKey partitionKey,
 +                                                       ClusteringIndexFilter clusteringIndexFilter)
 +    {
 +        return create(false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
 +    }
 +
 +    /**
 +     * Creates a new read command on a single partition for thrift.
 +     *
 +     * @param isForThrift whether the query is for thrift or not.
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param columnFilter the column filter to use for the query.
 +     * @param rowFilter the row filter to use for the query.
 +     * @param limits the limits to use for the query.
 +     * @param partitionKey the partition key for the partition to query.
 +     * @param clusteringIndexFilter the clustering index filter to use for the query.
 +     *
 +     * @return a newly created read command.
 +     */
 +    public static SinglePartitionReadCommand<?> create(boolean isForThrift,
 +                                                       CFMetaData metadata,
 +                                                       int nowInSec,
 +                                                       ColumnFilter columnFilter,
 +                                                       RowFilter rowFilter,
 +                                                       DataLimits limits,
 +                                                       DecoratedKey partitionKey,
 +                                                       ClusteringIndexFilter clusteringIndexFilter)
 +    {
 +        if (clusteringIndexFilter instanceof ClusteringIndexSliceFilter)
 +            return new SinglePartitionSliceCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexSliceFilter) clusteringIndexFilter);
 +
 +        assert clusteringIndexFilter instanceof ClusteringIndexNamesFilter;
 +        return new SinglePartitionNamesCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexNamesFilter) clusteringIndexFilter);
 +    }
 +
 +    /**
 +     * Creates a new read command on a single partition.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param key the partition key for the partition to query.
 +     * @param columnFilter the column filter to use for the query.
 +     * @param filter the clustering index filter to use for the query.
 +     *
 +     * @return a newly created read command. The returned command will use no row filter and have no limits.
 +     */
 +    public static SinglePartitionReadCommand<?> create(CFMetaData metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter)
 +    {
 +        return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter);
 +    }
 +
 +    /**
 +     * Creates a new read command that queries a single partition in its entirety.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param key the partition key for the partition to query.
 +     *
 +     * @return a newly created read command that queries all the rows of {@code key}.
 +     */
 +    public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key)
 +    {
 +        return SinglePartitionSliceCommand.create(metadata, nowInSec, key, Slices.ALL);
 +    }
 +
 +    /**
 +     * Creates a new read command that queries a single partition in its entirety.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     * @param key the partition key for the partition to query.
 +     *
 +     * @return a newly created read command that queries all the rows of {@code key}.
 +     */
 +    public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key)
 +    {
 +        return SinglePartitionSliceCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL);
 +    }
 +
 +    public DecoratedKey partitionKey()
 +    {
 +        return partitionKey;
 +    }
 +
 +    public F clusteringIndexFilter()
 +    {
 +        return clusteringIndexFilter;
 +    }
 +
 +    public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
 +    {
 +        return clusteringIndexFilter;
 +    }
 +
 +    public long getTimeout()
 +    {
 +        return DatabaseDescriptor.getReadRpcTimeout();
 +    }
 +
 +    public boolean selects(DecoratedKey partitionKey, Clustering clustering)
 +    {
 +        if (!partitionKey().equals(partitionKey))
 +            return false;
 +
 +        if (clustering == Clustering.STATIC_CLUSTERING)
 +            return !columnFilter().fetchedColumns().statics.isEmpty();
 +
 +        return clusteringIndexFilter().selects(clustering);
 +    }
 +
 +    /**
 +     * Returns a new command suitable to paging from the last returned row.
 +     *
 +     * @param lastReturned the last row returned by the previous page. The newly created command
 +     * will only query row that comes after this (in query order). This can be {@code null} if this
 +     * is the first page.
 +     * @param pageSize the size to use for the page to query.
 +     *
 +     * @return the newly create command.
 +     */
 +    public SinglePartitionReadCommand forPaging(Clustering lastReturned, int pageSize)
 +    {
 +        // We shouldn't have set digest yet when reaching that point
 +        assert !isDigestQuery();
 +        return create(isForThrift(),
 +                      metadata(),
 +                      nowInSec(),
 +                      columnFilter(),
 +                      rowFilter(),
 +                      limits().forPaging(pageSize),
 +                      partitionKey(),
 +                      lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false));
 +    }
 +
 +    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
 +    {
 +        return StorageProxy.read(Group.one(this), consistency, clientState);
 +    }
 +
 +    public SinglePartitionPager getPager(PagingState pagingState)
 +    {
 +        return getPager(this, pagingState);
 +    }
 +
 +    private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState)
 +    {
 +        return new SinglePartitionPager(command, pagingState);
 +    }
 +
 +    protected void recordLatency(TableMetrics metric, long latencyNanos)
 +    {
 +        metric.readLatency.addNano(latencyNanos);
 +    }
 +
 +    protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
 +    {
 +        @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail)
 +        UnfilteredRowIterator partition = cfs.isRowCacheEnabled()
 +                                        ? getThroughCache(cfs, orderGroup.baseReadOpOrderGroup())
 +                                        : queryMemtableAndDisk(cfs, orderGroup.baseReadOpOrderGroup());
 +        return new SingletonUnfilteredPartitionIterator(partition, isForThrift());
 +    }
 +
 +    /**
 +     * Fetch the rows requested if in cache; if not, read it from disk and cache it.
 +     * <p>
 +     * If the partition is cached, and the filter given is within its bounds, we return
 +     * from cache, otherwise from disk.
 +     * <p>
 +     * If the partition is is not cached, we figure out what filter is "biggest", read
 +     * that from disk, then filter the result and either cache that or return it.
 +     */
 +    private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, OpOrder.Group readOp)
 +    {
 +        assert !cfs.isIndex(); // CASSANDRA-5732
 +        assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [%s]", cfs.name);
 +
-         UUID cfId = metadata().cfId;
-         RowCacheKey key = new RowCacheKey(cfId, partitionKey());
++        RowCacheKey key = new RowCacheKey(metadata().ksAndCFName, partitionKey());
 +
 +        // Attempt a sentinel-read-cache sequence.  if a write invalidates our sentinel, we'll return our
 +        // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
 +        // TODO: don't evict entire partitions on writes (#2864)
 +        IRowCacheEntry cached = CacheService.instance.rowCache.get(key);
 +        if (cached != null)
 +        {
 +            if (cached instanceof RowCacheSentinel)
 +            {
 +                // Some other read is trying to cache the value, just do a normal non-caching read
 +                Tracing.trace("Row cache miss (race)");
 +                cfs.metric.rowCacheMiss.inc();
 +                return queryMemtableAndDisk(cfs, readOp);
 +            }
 +
 +            CachedPartition cachedPartition = (CachedPartition)cached;
 +            if (cfs.isFilterFullyCoveredBy(clusteringIndexFilter(), limits(), cachedPartition, nowInSec()))
 +            {
 +                cfs.metric.rowCacheHit.inc();
 +                Tracing.trace("Row cache hit");
 +                return clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), cachedPartition);
 +            }
 +
 +            cfs.metric.rowCacheHitOutOfRange.inc();
 +            Tracing.trace("Ignoring row cache as cached value could not satisfy query");
 +            return queryMemtableAndDisk(cfs, readOp);
 +        }
 +
 +        cfs.metric.rowCacheMiss.inc();
 +        Tracing.trace("Row cache miss");
 +
 +        boolean cacheFullPartitions = metadata().params.caching.cacheAllRows();
 +
 +        // To be able to cache what we read, what we read must at least covers what the cache holds, that
 +        // is the 'rowsToCache' first rows of the partition. We could read those 'rowsToCache' first rows
 +        // systematically, but we'd have to "extend" that to whatever is needed for the user query that the
 +        // 'rowsToCache' first rows don't cover and it's not trivial with our existing filters. So currently
 +        // we settle for caching what we read only if the user query does query the head of the partition since
 +        // that's the common case of when we'll be able to use the cache anyway. One exception is if we cache
 +        // full partitions, in which case we just always read it all and cache.
 +        if (cacheFullPartitions || clusteringIndexFilter().isHeadFilter())
 +        {
 +            RowCacheSentinel sentinel = new RowCacheSentinel();
 +            boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel);
 +            boolean sentinelReplaced = false;
 +
 +            try
 +            {
 +                int rowsToCache = metadata().params.caching.rowsPerPartitionToCache();
 +                @SuppressWarnings("resource") // we close on exception or upon closing the result of this method
 +                UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
 +                try
 +                {
 +                    // We want to cache only rowsToCache rows
 +                    CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec()), nowInSec());
 +                    if (sentinelSuccess && !toCache.isEmpty())
 +                    {
 +                        Tracing.trace("Caching {} rows", toCache.rowCount());
 +                        CacheService.instance.rowCache.replace(key, sentinel, toCache);
 +                        // Whether or not the previous replace has worked, our sentinel is not in the cache anymore
 +                        sentinelReplaced = true;
 +                    }
 +
 +                    // We then re-filter out what this query wants.
 +                    // Note that in the case where we don't cache full partitions, it's possible that the current query is interested in more
 +                    // than what we've cached, so we can't just use toCache.
 +                    UnfilteredRowIterator cacheIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), toCache);
 +                    if (cacheFullPartitions)
 +                    {
 +                        // Everything is guaranteed to be in 'toCache', we're done with 'iter'
 +                        assert !iter.hasNext();
 +                        iter.close();
 +                        return cacheIterator;
 +                    }
 +                    return UnfilteredRowIterators.concat(cacheIterator, clusteringIndexFilter().filterNotIndexed(columnFilter(), iter));
 +                }
 +                catch (RuntimeException | Error e)
 +                {
 +                    iter.close();
 +                    throw e;
 +                }
 +            }
 +            finally
 +            {
 +                if (sentinelSuccess && !sentinelReplaced)
 +                    cfs.invalidateCachedPartition(key);
 +            }
 +        }
 +
 +        Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition");
 +        return queryMemtableAndDisk(cfs, readOp);
 +    }
 +
 +    /**
 +     * Queries both memtable and sstables to fetch the result of this query.
 +     * <p>
 +     * Please note that this method:
 +     *   1) does not check the row cache.
 +     *   2) does not apply the query limit, nor the row filter (and so ignore 2ndary indexes).
 +     *      Those are applied in {@link ReadCommand#executeLocally}.
 +     *   3) does not record some of the read metrics (latency, scanned cells histograms) nor
 +     *      throws TombstoneOverwhelmingException.
 +     * It is publicly exposed because there is a few places where that is exactly what we want,
 +     * but it should be used only where you know you don't need thoses things.
 +     * <p>
 +     * Also note that one must have "started" a {@code OpOrder.Group} on the queried table, and that is
 +     * to enforce that that it is required as parameter, even though it's not explicitlly used by the method.
 +     */
 +    public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, OpOrder.Group readOp)
 +    {
 +        Tracing.trace("Executing single-partition query on {}", cfs.name);
 +
 +        boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap();
 +        return queryMemtableAndDiskInternal(cfs, copyOnHeap);
 +    }
 +
 +    protected abstract UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap);
 +
 +    @Override
 +    public String toString()
 +    {
 +        return String.format("Read(%s.%s columns=%s rowFilter=%s limits=%s key=%s filter=%s, nowInSec=%d)",
 +                             metadata().ksName,
 +                             metadata().cfName,
 +                             columnFilter(),
 +                             rowFilter(),
 +                             limits(),
 +                             metadata().getKeyValidator().getString(partitionKey().getKey()),
 +                             clusteringIndexFilter.toString(metadata()),
 +                             nowInSec());
 +    }
 +
 +    public MessageOut<ReadCommand> createMessage(int version)
 +    {
 +        return new MessageOut<>(MessagingService.Verb.READ, this, version < MessagingService.VERSION_30 ? legacyReadCommandSerializer : serializer);
 +    }
 +
 +    protected void appendCQLWhereClause(StringBuilder sb)
 +    {
 +        sb.append(" WHERE ");
 +
 +        sb.append(ColumnDefinition.toCQLString(metadata().partitionKeyColumns())).append(" = ");
 +        DataRange.appendKeyString(sb, metadata().getKeyValidator(), partitionKey().getKey());
 +
 +        // We put the row filter first because the clustering index filter can end by "ORDER BY"
 +        if (!rowFilter().isEmpty())
 +            sb.append(" AND ").append(rowFilter());
 +
 +        String filterString = clusteringIndexFilter().toCQLString(metadata());
 +        if (!filterString.isEmpty())
 +            sb.append(" AND ").append(filterString);
 +    }
 +
 +    protected void serializeSelection(DataOutputPlus out, int version) throws IOException
 +    {
 +        metadata().getKeyValidator().writeValue(partitionKey().getKey(), out);
 +        ClusteringIndexFilter.serializer.serialize(clusteringIndexFilter(), out, version);
 +    }
 +
 +    protected long selectionSerializedSize(int version)
 +    {
 +        return metadata().getKeyValidator().writtenLength(partitionKey().getKey())
 +             + ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), version);
 +    }
 +
 +    /**
 +     * Groups multiple single partition read commands.
 +     */
 +    public static class Group implements ReadQuery
 +    {
 +        public final List<SinglePartitionReadCommand<?>> commands;
 +        private final DataLimits limits;
 +        private final int nowInSec;
 +
 +        public Group(List<SinglePartitionReadCommand<?>> commands, DataLimits limits)
 +        {
 +            assert !commands.isEmpty();
 +            this.commands = commands;
 +            this.limits = limits;
 +            this.nowInSec = commands.get(0).nowInSec();
 +            for (int i = 1; i < commands.size(); i++)
 +                assert commands.get(i).nowInSec() == nowInSec;
 +        }
 +
 +        public static Group one(SinglePartitionReadCommand<?> command)
 +        {
 +            return new Group(Collections.<SinglePartitionReadCommand<?>>singletonList(command), command.limits());
 +        }
 +
 +        public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
 +        {
 +            return StorageProxy.read(this, consistency, clientState);
 +        }
 +
 +        public int nowInSec()
 +        {
 +            return nowInSec;
 +        }
 +
 +        public DataLimits limits()
 +        {
 +            return limits;
 +        }
 +
 +        public CFMetaData metadata()
 +        {
 +            return commands.get(0).metadata();
 +        }
 +
 +        public ReadOrderGroup startOrderGroup()
 +        {
 +            // Note that the only difference between the command in a group must be the partition key on which
 +            // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one.
 +            return commands.get(0).startOrderGroup();
 +        }
 +
 +        public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
 +        {
 +            List<PartitionIterator> partitions = new ArrayList<>(commands.size());
 +            for (SinglePartitionReadCommand cmd : commands)
 +                partitions.add(cmd.executeInternal(orderGroup));
 +
 +            // Because we only have enforce the limit per command, we need to enforce it globally.
 +            return limits.filter(PartitionIterators.concat(partitions), nowInSec);
 +        }
 +
 +        public QueryPager getPager(PagingState pagingState)
 +        {
 +            if (commands.size() == 1)
 +                return SinglePartitionReadCommand.getPager(commands.get(0), pagingState);
 +
 +            return new MultiPartitionPager(this, pagingState);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return commands.toString();
 +        }
 +    }
 +
 +    private static class Deserializer extends SelectionDeserializer
 +    {
 +        public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
 +        throws IOException
 +        {
 +            DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in));
 +            ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
 +            if (filter instanceof ClusteringIndexNamesFilter)
 +                return new SinglePartitionNamesCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexNamesFilter)filter);
 +            else
 +                return new SinglePartitionSliceCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexSliceFilter)filter);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index d8ff36a,ce12206..04cdc21
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1518,12 -1491,16 +1518,12 @@@ public abstract class SSTableReader ext
  
      public void cacheKey(DecoratedKey key, RowIndexEntry info)
      {
 -        CachingOptions caching = metadata.getCaching();
 +        CachingParams caching = metadata.params.caching;
  
 -        if (!caching.keyCache.isEnabled()
 -                || keyCache == null
 -                || keyCache.getCapacity() == 0)
 -        {
 +        if (!caching.cacheKeys() || keyCache == null || keyCache.getCapacity() == 0)
              return;
 -        }
  
-         KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
+         KeyCacheKey cacheKey = new KeyCacheKey(metadata.ksAndCFName, descriptor, key.getKey());
          logger.trace("Adding cache entry for {} -> {}", cacheKey, info);
          keyCache.put(cacheKey, info);
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------


Mime
View raw message