cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [2/3] Support indexes on composite column components
Date Thu, 04 Apr 2013 16:31:47 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 4e0f536..1acd475 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -67,24 +67,29 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         CFMetaData cfm = oldCfm.clone();
         CFDefinition cfDef = oldCfm.getCfDef();
 
-        for (ColumnDefinition cd : cfm.getColumn_metadata().values())
+        for (ColumnDefinition cd : cfm.allColumns())
         {
             if (cd.name.equals(columnName.key))
             {
                 if (cd.getIndexType() != null)
                     throw new InvalidRequestException("Index already exists");
-                if (logger.isDebugEnabled())
-                    logger.debug("Updating column {} definition for index {}", columnName, indexName);
+
+                if (cd.type == ColumnDefinition.Type.PARTITION_KEY && (cd.componentIndex == null || cd.componentIndex == 0))
+                    throw new InvalidRequestException(String.format("Cannot add secondary index to already primarily indexed column %s", columnName));
+
+                // TODO: we could lift that limitation
+                if (cfDef.isCompact && cd.type != ColumnDefinition.Type.REGULAR)
+                    throw new InvalidRequestException(String.format("Secondary index on %s column %s is not yet supported for compact table", cd.type, columnName));
 
                 if (cd.getValidator().isCollection())
                     throw new InvalidRequestException("Indexes on collections are no yet supported");
 
+                if (logger.isDebugEnabled())
+                    logger.debug("Updating column {} definition for index {}", columnName, indexName);
+
                 if (cfDef.isComposite)
                 {
-                    CompositeType composite = (CompositeType)cfm.comparator;
-                    Map<String, String> opts = new HashMap<String, String>();
-                    opts.put(CompositesIndex.PREFIX_SIZE_OPTION, String.valueOf(composite.types.size() - (cfDef.hasCollections ? 2 : 1)));
-                    cd.setIndexType(IndexType.COMPOSITES, opts);
+                    cd.setIndexType(IndexType.COMPOSITES, Collections.<String, String>emptyMap());
                 }
                 else
                 {
@@ -113,7 +118,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         }
 
         cfm.addDefaultIndexNames();
-        MigrationManager.announceColumnFamilyUpdate(cfm);
+        MigrationManager.announceColumnFamilyUpdate(cfm, false);
     }
 
     public ResultMessage.SchemaChange.Change changeType()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
index 6edb551..c8c34e0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
@@ -52,7 +52,7 @@ public class DropIndexStatement extends SchemaAlteringStatement
     public void announceMigration() throws InvalidRequestException, ConfigurationException
     {
         CFMetaData updatedCfm = updateCFMetadata(findIndexedCF());
-        MigrationManager.announceColumnFamilyUpdate(updatedCfm);
+        MigrationManager.announceColumnFamilyUpdate(updatedCfm, false);
     }
 
     private CFMetaData updateCFMetadata(CFMetaData cfm) throws InvalidRequestException
@@ -60,7 +60,7 @@ public class DropIndexStatement extends SchemaAlteringStatement
         ColumnDefinition column = findIndexedColumn(cfm);
         assert column != null;
         CFMetaData cloned = cfm.clone();
-        ColumnDefinition toChange = cloned.getColumn_metadata().get(column.name);
+        ColumnDefinition toChange = cloned.getColumnDefinition(column.name);
         assert toChange.getIndexName() != null && toChange.getIndexName().equals(indexName);
         toChange.setIndexName(null);
         toChange.setIndexType(null, null);
@@ -80,7 +80,7 @@ public class DropIndexStatement extends SchemaAlteringStatement
 
     private ColumnDefinition findIndexedColumn(CFMetaData cfm)
     {
-        for (ColumnDefinition column : cfm.getColumn_metadata().values())
+        for (ColumnDefinition column : cfm.allColumns())
         {
             if (column.getIndexType() != null && column.getIndexName() != null && column.getIndexName().equals(indexName))
                 return column;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index f05e6f5..d2c35e9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -67,12 +67,16 @@ public class SelectStatement implements CQLStatement
     private final Restriction[] keyRestrictions;
     private final Restriction[] columnRestrictions;
     private final Map<CFDefinition.Name, Restriction> metadataRestrictions = new HashMap<CFDefinition.Name, Restriction>();
+
+    // The name of all restricted names not covered by the key or index filter
+    private final Set<CFDefinition.Name> restrictedNames = new HashSet<CFDefinition.Name>();
     private Restriction sliceRestriction;
 
     private boolean isReversed;
     private boolean onToken;
     private boolean isKeyRange;
     private boolean keyIsInRelation;
+    private boolean usesSecondaryIndexing;
 
     private static enum Bound
     {
@@ -123,7 +127,7 @@ public class SelectStatement implements CQLStatement
 
         cl.validateForRead(keyspace());
 
-        List<Row> rows = isKeyRange
+        List<Row> rows = isKeyRange || usesSecondaryIndexing
                        ? StorageProxy.getRangeSlice(getRangeCommand(variables), cl)
                        : StorageProxy.read(getSliceCommands(variables), cl);
 
@@ -151,7 +155,7 @@ public class SelectStatement implements CQLStatement
     {
         try
         {
-            List<Row> rows = isKeyRange
+            List<Row> rows = isKeyRange || usesSecondaryIndexing
                            ? RangeSliceVerbHandler.executeLocally(getRangeCommand(Collections.<ByteBuffer>emptyList()))
                            : readLocally(keyspace(), getSliceCommands(Collections.<ByteBuffer>emptyList()));
 
@@ -552,14 +556,28 @@ public class SelectStatement implements CQLStatement
 
     private List<IndexExpression> getIndexExpressions(List<ByteBuffer> variables) throws InvalidRequestException
     {
-        if (metadataRestrictions.isEmpty())
+        if (!usesSecondaryIndexing || restrictedNames.isEmpty())
             return Collections.<IndexExpression>emptyList();
 
         List<IndexExpression> expressions = new ArrayList<IndexExpression>();
-        for (Map.Entry<CFDefinition.Name, Restriction> entry : metadataRestrictions.entrySet())
+        for (CFDefinition.Name name : restrictedNames)
         {
-            CFDefinition.Name name = entry.getKey();
-            Restriction restriction = entry.getValue();
+            Restriction restriction;
+            switch (name.kind)
+            {
+                case KEY_ALIAS:
+                    restriction = keyRestrictions[name.position];
+                    break;
+                case COLUMN_ALIAS:
+                    restriction = columnRestrictions[name.position];
+                    break;
+                case COLUMN_METADATA:
+                    restriction = metadataRestrictions.get(name);
+                    break;
+                default:
+                    // We don't allow restricting a VALUE_ALIAS for now in prepare.
+                    throw new AssertionError();
+            }
             if (restriction.isEquality())
             {
                 for (Term t : restriction.eqValues)
@@ -871,6 +889,22 @@ public class SelectStatement implements CQLStatement
         return true;
     }
 
+    private boolean hasIndexedColumnRestricted(List<ColumnDefinition> columns, Restriction[] restrictions)
+    {
+        assert columns.size() == restrictions.length;
+        for (int i = 0; i < columns.size(); ++i)
+        {
+            Restriction restriction = restrictions[i];
+            if (restriction == null)
+                continue;
+
+            ColumnDefinition def = columns.get(i);
+            if (def != null && def.isIndexed())
+                return true;
+        }
+        return false;
+    }
+
     public static class RawStatement extends CFStatement
     {
         private final Parameters parameters;
@@ -915,12 +949,21 @@ public class SelectStatement implements CQLStatement
              *     (we could allow two IN for the same entity but that doesn't seem very useful)
              *   - The value_alias cannot be restricted in any way (we don't support wide rows with indexed value in CQL so far)
              */
+            boolean hasQueriableIndex = false;
             for (Relation rel : whereClause)
             {
                 CFDefinition.Name name = cfDef.get(rel.getEntity());
                 if (name == null)
                     throw new InvalidRequestException(String.format("Undefined name %s in where clause ('%s')", rel.getEntity(), rel));
 
+                ColumnDefinition def = cfDef.cfm.getColumnDefinition(name.name.key);
+                stmt.restrictedNames.add(name);
+                if (def.isIndexed())
+                {
+                    if (rel.operator() == Relation.Type.EQ)
+                        hasQueriableIndex = true;
+                }
+
                 switch (name.kind)
                 {
                     case KEY_ALIAS:
@@ -941,50 +984,18 @@ public class SelectStatement implements CQLStatement
              * At this point, the select statement if fully constructed, but we still have a few things to validate
              */
 
-            // If a component of the PRIMARY KEY is restricted by a non-EQ relation, all preceding
-            // components must have a EQ, and all following must have no restriction
-            boolean shouldBeDone = false;
-            CFDefinition.Name previous = null;
-            Iterator<CFDefinition.Name> iter = cfDef.columns.values().iterator();
-            for (int i = 0; i < stmt.columnRestrictions.length; i++)
-            {
-                CFDefinition.Name cname = iter.next();
-                Restriction restriction = stmt.columnRestrictions[i];
-                if (restriction == null)
-                {
-                    shouldBeDone = true;
-                }
-                else if (shouldBeDone)
-                {
-                    throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted (preceding part %s is either not restricted or by a non-EQ relation)", cname, previous));
-                }
-                else if (!restriction.isEquality())
-                {
-                    shouldBeDone = true;
-                    // For non-composite slices, we don't support internally the difference between exclusive and
-                    // inclusive bounds, so we deal with it manually.
-                    if (!cfDef.isComposite && (!restriction.isInclusive(Bound.START) || !restriction.isInclusive(Bound.END)))
-                        stmt.sliceRestriction = restriction;
-                }
-                // We only support IN for the last name so far
-                // TODO: #3885 allows us to extend to other parts (cf. #4762)
-                else if (restriction.eqValues.size() > 1)
-                {
-                    if (i != stmt.columnRestrictions.length - 1)
-                        throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted by IN relation", cname));
-                    else if (stmt.selectACollection())
-                        throw new InvalidRequestException(String.format("Cannot restrict PRIMARY KEY part %s by IN relation as a collection is selected by the query", cname));
-                }
-
-                previous = cname;
-            }
+            // If there is a queriable index, no special condition are required on the other restrictions.
+            // But we still need to know 2 things:
+            //   - If we don't have a queriable index, is the query ok
+            //   - Is it queriable without 2ndary index, which is always more efficient
 
             // If a component of the partition key is restricted by a non-EQ relation, all preceding
             // components must have a EQ, and all following must have no restriction
-            shouldBeDone = false;
-            previous = null;
+            boolean shouldBeDone = false;
+            CFDefinition.Name previous = null;
             stmt.keyIsInRelation = false;
-            iter = cfDef.keys.values().iterator();
+            Iterator<CFDefinition.Name> iter = cfDef.keys.values().iterator();
+            int lastRestrictedPartitionKey = stmt.keyRestrictions.length - 1;
             for (int i = 0; i < stmt.keyRestrictions.length; i++)
             {
                 CFDefinition.Name cname = iter.next();
@@ -992,18 +1003,33 @@ public class SelectStatement implements CQLStatement
 
                 if (restriction == null)
                 {
+                    if (!shouldBeDone)
+                        lastRestrictedPartitionKey = i - 1;
+
                     if (stmt.onToken)
                         throw new InvalidRequestException("The token() function must be applied to all partition key components or none of them");
 
                     // Under a non order perserving partitioner, the only time not restricting a key part is allowed is if none are restricted
                     if (!partitioner.preservesOrder() && i > 0 && stmt.keyRestrictions[i-1] != null)
+                    {
+                        if (hasQueriableIndex)
+                        {
+                            stmt.usesSecondaryIndexing = true;
+                            break;
+                        }
                         throw new InvalidRequestException(String.format("Partition key part %s must be restricted since preceding part is", cname));
+                    }
 
                     stmt.isKeyRange = true;
                     shouldBeDone = true;
                 }
                 else if (shouldBeDone)
                 {
+                    if (hasQueriableIndex)
+                    {
+                        stmt.usesSecondaryIndexing = true;
+                        break;
+                    }
                     throw new InvalidRequestException(String.format("partition key part %s cannot be restricted (preceding part %s is either not restricted or by a non-EQ relation)", cname, previous));
                 }
                 else if (restriction.onToken)
@@ -1032,58 +1058,101 @@ public class SelectStatement implements CQLStatement
                 else
                 {
                     if (!partitioner.preservesOrder())
+                    {
+                        if (hasQueriableIndex)
+                        {
+                            stmt.usesSecondaryIndexing = true;
+                            break;
+                        }
                         throw new InvalidRequestException("Only EQ and IN relation are supported on the partition key for random partitioners (unless you use the token() function)");
+                    }
 
                     stmt.isKeyRange = true;
+                    lastRestrictedPartitionKey = i;
                     shouldBeDone = true;
                 }
                 previous = cname;
             }
 
-            // Deal with indexed columns
-            if (!stmt.metadataRestrictions.isEmpty())
+            // If a cluster key column is restricted by a non-EQ relation, all preceding
+            // columns must have a EQ, and all following must have no restriction. Unless
+            // the column is indexed that is.
+            shouldBeDone = false;
+            previous = null;
+            iter = cfDef.columns.values().iterator();
+            int lastRestrictedClusteringKey = stmt.columnRestrictions.length - 1;
+            for (int i = 0; i < stmt.columnRestrictions.length; i++)
             {
-                stmt.isKeyRange = true;
-                boolean hasEq = false;
-                Set<ByteBuffer> indexedNames = new HashSet<ByteBuffer>();
-                indexedNames.add(cfm.getKeyName());
-                for (ColumnDefinition cfdef : cfm.getColumn_metadata().values())
+                CFDefinition.Name cname = iter.next();
+                Restriction restriction = stmt.columnRestrictions[i];
+
+                if (restriction == null)
                 {
-                    if (cfdef.getIndexType() != null)
-                    {
-                        indexedNames.add(cfdef.name);
-                    }
+                    if (!shouldBeDone)
+                        lastRestrictedClusteringKey = i - 1;
+                    shouldBeDone = true;
                 }
-
-                // Note: we cannot use idxManager.indexes() methods because we don't have a complete column name at this point, we only
-                // have the indexed component.
-                for (Map.Entry<CFDefinition.Name, Restriction> entry : stmt.metadataRestrictions.entrySet())
+                else
                 {
-                    Restriction restriction = entry.getValue();
-                    if (!restriction.isEquality())
-                        continue;
-
-                    // We don't support IN for indexed values (basically this would require supporting a form of OR)
-                    if (restriction.eqValues.size() > 1)
-                        throw new InvalidRequestException("Cannot use IN operator on column not part of the PRIMARY KEY");
-
-                    if (indexedNames.contains(entry.getKey().name.key))
+                    if (shouldBeDone)
                     {
-                        hasEq = true;
-                        break;
+                        if (hasQueriableIndex)
+                        {
+                            stmt.usesSecondaryIndexing = true;
+                            break;
+                        }
+                        throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted (preceding part %s is either not restricted or by a non-EQ relation)", cname, previous));
+                    }
+                    else if (!restriction.isEquality())
+                    {
+                        lastRestrictedClusteringKey = i;
+                        shouldBeDone = true;
+                        // For non-composite slices, we don't support internally the difference between exclusive and
+                        // inclusive bounds, so we deal with it manually.
+                        if (!cfDef.isComposite && (!restriction.isInclusive(Bound.START) || !restriction.isInclusive(Bound.END)))
+                            stmt.sliceRestriction = restriction;
+                    }
+                    // We only support IN for the last name and for compact storage so far
+                    // TODO: #3885 allows us to extend to non compact as well, but that remains to be done
+                    else if (restriction.eqValues.size() > 1)
+                    {
+                        if (i != stmt.columnRestrictions.length - 1)
+                            throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted by IN relation", cname));
+                        else if (stmt.selectACollection())
+                            throw new InvalidRequestException(String.format("Cannot restrict PRIMARY KEY part %s by IN relation as a collection is selected by the query", cname));
                     }
                 }
-                if (!hasEq)
+
+                previous = cname;
+            }
+
+            // Even if usesSecondaryIndexing is false at this point, we'll still have to use one if
+            // there is restrictions not covered by the PK.
+            if (!stmt.metadataRestrictions.isEmpty())
+            {
+                if (!hasQueriableIndex)
                     throw new InvalidRequestException("No indexed columns present in by-columns clause with Equal operator");
 
-                // If we have indexed columns and the key = X clause, we will do a range query, but if it's a IN relation, we don't know how to handle it.
+                stmt.usesSecondaryIndexing = true;
+            }
+
+            if (stmt.usesSecondaryIndexing)
+            {
                 if (stmt.keyIsInRelation)
                     throw new InvalidRequestException("Select on indexed columns and with IN clause for the PRIMARY KEY are not supported");
             }
 
+            iter = cfDef.keys.values().iterator();
+            for (int i = 0; i < lastRestrictedPartitionKey + 1; i++)
+                stmt.restrictedNames.remove(iter.next());
+
+            iter = cfDef.columns.values().iterator();
+            for (int i = 0; i < lastRestrictedClusteringKey + 1; i++)
+                stmt.restrictedNames.remove(iter.next());
+
             if (!stmt.parameters.orderings.isEmpty())
             {
-                if (!stmt.metadataRestrictions.isEmpty())
+                if (stmt.usesSecondaryIndexing)
                     throw new InvalidRequestException("ORDER BY with 2ndary indexes is not supported.");
 
                 if (stmt.isKeyRange)
@@ -1153,13 +1222,13 @@ public class SelectStatement implements CQLStatement
                 stmt.isReversed = isReversed;
             }
 
-            // Make sure this queries is allowed (note: only key range can involve filtering underneath)
-            if (!parameters.allowFiltering && stmt.isKeyRange)
+            // Make sure this queries is allowed (note: non key range non indexed cannot involve filtering underneath)
+            if (!parameters.allowFiltering && (stmt.isKeyRange || stmt.usesSecondaryIndexing))
             {
                 // We will potentially filter data if either:
                 //  - Have more than one IndexExpression
                 //  - Have no index expression and the column filter is not the identity
-                if (stmt.metadataRestrictions.size() > 1 || (stmt.metadataRestrictions.isEmpty() && !stmt.columnFilterIsIdentity()))
+                if (stmt.restrictedNames.size() > 1 || (stmt.restrictedNames.isEmpty() && !stmt.columnFilterIsIdentity()))
                     throw new InvalidRequestException("Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. "
                                                     + "If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING");
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 972784a..0c4a4ea 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -269,7 +269,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         this.compactionStrategy = metadata.createCompactionStrategyInstance(this);
 
         // create the private ColumnFamilyStores for the secondary column indexes
-        for (ColumnDefinition info : metadata.getColumn_metadata().values())
+        for (ColumnDefinition info : metadata.allColumns())
         {
             if (info.getIndexType() != null)
                 indexManager.addIndexedColumn(info);
@@ -452,7 +452,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         CFMetaData cfm = Schema.instance.getCFMetaData(table, columnFamily);
         if (cfm != null) // secondary indexes aren't stored in DD.
         {
-            for (ColumnDefinition def : cfm.getColumn_metadata().values())
+            for (ColumnDefinition def : cfm.allColumns())
                 scrubDataDirectories(table, cfm.indexColumnFamilyName(def));
         }
     }
@@ -1535,7 +1535,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                             data.addAll(cf, HeapAllocator.instance);
                     }
 
-                    if (!filter.isSatisfiedBy(data, null))
+                    if (!filter.isSatisfiedBy(rawRow.key.key, data, null))
                         continue;
 
                     logger.trace("{} satisfies all filter expressions", data);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/db/EmptyColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/EmptyColumns.java b/src/java/org/apache/cassandra/db/EmptyColumns.java
index af64628..6df7a68 100644
--- a/src/java/org/apache/cassandra/db/EmptyColumns.java
+++ b/src/java/org/apache/cassandra/db/EmptyColumns.java
@@ -33,7 +33,8 @@ public class EmptyColumns extends AbstractThreadUnsafeSortedColumns
         return new EmptyColumns(metadata, deletionInfo);
     }
 
-    public void clear() {
+    public void clear()
+    {
     }
 
     public Factory<EmptyColumns> getFactory()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 41f9d28..064cc6e 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -26,8 +26,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.thrift.IndexExpression;
@@ -139,7 +141,7 @@ public abstract class ExtendedFilter
      * @return true if the provided data satisfies all the expressions from
      * the clause of this filter.
      */
-    public abstract boolean isSatisfiedBy(ColumnFamily data, ColumnNameBuilder builder);
+    public abstract boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder);
 
     public static boolean satisfies(int comparison, IndexOperator op)
     {
@@ -273,23 +275,62 @@ public abstract class ExtendedFilter
             return pruned;
         }
 
-        public boolean isSatisfiedBy(ColumnFamily data, ColumnNameBuilder builder)
+        public boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
         {
             // We enforces even the primary clause because reads are not synchronized with writes and it is thus possible to have a race
             // where the index returned a row which doesn't have the primary column when we actually read it
             for (IndexExpression expression : clause)
             {
-                // check column data vs expression
-                ByteBuffer colName = builder == null ? expression.column_name : builder.copy().add(expression.column_name).build();
-                Column column = data.getColumn(colName);
-                if (column == null)
+                ColumnDefinition def = data.metadata().getColumnDefinition(expression.column_name);
+                ByteBuffer dataValue = null;
+                AbstractType<?> validator = null;
+                if (def == null)
+                {
+                    // This can't happen with CQL3 as this should be rejected upfront. For thrift however,
+                    // column name are not predefined. But that means the column name correspond to an internal one.
+                    Column column = data.getColumn(expression.column_name);
+                    if (column != null)
+                    {
+                        dataValue = column.value();
+                        validator = data.metadata().getDefaultValidator();
+                    }
+                }
+                else
+                {
+                    dataValue = extractDataValue(def, rowKey, data, builder);
+                    validator = def.getValidator();
+                }
+
+                if (dataValue == null)
                     return false;
-                int v = data.metadata().getValueValidator(expression.column_name).compare(column.value(), expression.value);
+
+                int v = validator.compare(dataValue, expression.value);
                 if (!satisfies(v, expression.op))
                     return false;
             }
             return true;
         }
+
+        private ByteBuffer extractDataValue(ColumnDefinition def, ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
+        {
+            switch (def.type)
+            {
+                case PARTITION_KEY:
+                    return def.componentIndex == null
+                         ? rowKey
+                         : ((CompositeType)data.metadata().getKeyValidator()).split(rowKey)[def.componentIndex];
+                case CLUSTERING_KEY:
+                    return builder.get(def.componentIndex);
+                case REGULAR:
+                    ByteBuffer colName = builder == null ? def.name : builder.copy().add(def.name).build();
+                    Column column = data.getColumn(colName);
+                    return column == null ? null : column.value();
+                case COMPACT_VALUE:
+                    assert data.getColumnCount() == 1;
+                    return data.getSortedColumns().iterator().next().value();
+            }
+            throw new AssertionError();
+        }
     }
 
     private static class FilterWithCompositeClauses extends FilterWithClauses
@@ -343,7 +384,7 @@ public abstract class ExtendedFilter
             return data;
         }
 
-        public boolean isSatisfiedBy(ColumnFamily data, ColumnNameBuilder builder)
+        public boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
         {
             return true;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index 30ebef8..e9d3682 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.ISSTableColumnIterator;
@@ -76,6 +77,7 @@ public interface IDiskAtomFilter
     public int getLiveCount(ColumnFamily cf);
 
     public IDiskAtomFilter cloneShallow();
+    public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix);
 
     public static class Serializer implements IVersionedSerializer<IDiskAtomFilter>
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 6ef43d5..bc2f71c 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -138,6 +138,16 @@ public class NamesQueryFilter implements IDiskAtomFilter
         return count;
     }
 
+    public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix)
+    {
+        for (ByteBuffer column : columns)
+        {
+            if (ByteBufferUtil.isPrefix(prefix, column))
+                return true;
+        }
+        return false;
+    }
+
     public static class Serializer implements IVersionedSerializer<NamesQueryFilter>
     {
         public void serialize(NamesQueryFilter f, DataOutput out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 01db13c..d428883 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -226,10 +226,10 @@ public class SliceQueryFilter implements IDiskAtomFilter
         count = newLimit;
     }
 
-    public boolean includes(Comparator<ByteBuffer> cmp, ByteBuffer name)
+    public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix)
     {
         for (ColumnSlice slice : slices)
-            if (slice.includes(cmp, name))
+            if (slice.includes(cmp, prefix))
                 return true;
         return false;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index dabba95..f12acdc 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -33,14 +33,19 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  */
 public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSecondaryIndex
 {
-    private ColumnFamilyStore indexCfs;
+    protected ColumnFamilyStore indexCfs;
+
+    // SecondaryIndex "forces" a set of ColumnDefinition. However this class (and thus it's subclass)
+    // only support one def per index. So inline it in a field for 1) convenience and 2) avoid creating
+    // an iterator each time we need to access it.
+    // TODO: we should fix SecondaryIndex API
+    protected ColumnDefinition columnDef;
 
     public void init()
     {
         assert baseCfs != null && columnDefs != null && columnDefs.size() == 1;
 
-        ColumnDefinition columnDef = columnDefs.iterator().next();
-        init(columnDef);
+        columnDef = columnDefs.iterator().next();
 
         AbstractType indexComparator = SecondaryIndex.getIndexComparator(baseCfs.metadata, columnDef);
         CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef, indexComparator);
@@ -69,10 +74,10 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
         }
     }
 
-    protected abstract void init(ColumnDefinition columnDef);
-
     protected abstract ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column);
 
+    protected abstract ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column);
+
     protected abstract AbstractType getExpressionComparator();
 
     public String expressionString(IndexExpression expr)
@@ -81,16 +86,15 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
                              baseCfs.name,
                              getExpressionComparator().getString(expr.column_name),
                              expr.op,
-                             baseCfs.metadata.getColumn_metadata().get(expr.column_name).getValidator().getString(expr.value));
+                             baseCfs.metadata.getColumnDefinition(expr.column_name).getValidator().getString(expr.value));
     }
 
-
     public void delete(ByteBuffer rowKey, Column column)
     {
         if (column.isMarkedForDelete())
             return;
 
-        DecoratedKey valueKey = getIndexKeyFor(column.value());
+        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, column));
         int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
         ColumnFamily cfi = ArrayBackedSortedColumns.factory.create(indexCfs.metadata);
         cfi.addTombstone(makeIndexColumnName(rowKey, column), localDeletionTime, column.timestamp());
@@ -101,7 +105,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
 
     public void insert(ByteBuffer rowKey, Column column)
     {
-        DecoratedKey valueKey = getIndexKeyFor(column.value());
+        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, column));
         ColumnFamily cfi = ArrayBackedSortedColumns.factory.create(indexCfs.metadata);
         ByteBuffer name = makeIndexColumnName(rowKey, column);
         if (column instanceof ExpiringColumn)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 11e026d..2153ff9 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -55,6 +55,10 @@ public abstract class SecondaryIndex
 
     public static final String CUSTOM_INDEX_OPTION_NAME = "class_name";
 
+    public static final AbstractType<?> keyComparator = StorageService.getPartitioner().preservesOrder()
+                                                      ? BytesType.instance
+                                                      : new LocalByPartionerType(StorageService.getPartitioner());
+
     /**
      * Base CF that has many indexes
      */
@@ -319,7 +323,7 @@ public abstract class SecondaryIndex
             index = new KeysIndex();
             break;
         case COMPOSITES:
-            index = new CompositesIndex();
+            index = CompositesIndex.create(cdef);
             break;
         case CUSTOM:
             assert cdef.getIndexOptions() != null;
@@ -355,32 +359,12 @@ public abstract class SecondaryIndex
      */
     public static AbstractType<?> getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cdef)
     {
-        IPartitioner rowPartitioner = StorageService.getPartitioner();
-        AbstractType<?> keyComparator = (rowPartitioner instanceof OrderPreservingPartitioner || rowPartitioner instanceof ByteOrderedPartitioner)
-                                      ? BytesType.instance
-                                      : new LocalByPartionerType(rowPartitioner);
-
         switch (cdef.getIndexType())
         {
             case KEYS:
                 return keyComparator;
             case COMPOSITES:
-                assert baseMetadata.comparator instanceof CompositeType;
-                int prefixSize;
-                try
-                {
-                    prefixSize = Integer.parseInt(cdef.getIndexOptions().get(CompositesIndex.PREFIX_SIZE_OPTION));
-                }
-                catch (NumberFormatException e)
-                {
-                    // This shouldn't happen if validation has been done correctly
-                    throw new RuntimeException(e);
-                }
-                List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 1);
-                types.add(keyComparator);
-                for (int i = 0; i < prefixSize; i++)
-                    types.add(((CompositeType)baseMetadata.comparator).types.get(i));
-                return CompositeType.getInstance(types);
+                return CompositesIndex.getIndexComparator(baseMetadata, cdef);
             case CUSTOM:
                 return null;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 4111093..b98e184 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexType;
 
 /**
  * Manages all the indexes associated with a given CFS
@@ -94,12 +95,13 @@ public class SecondaryIndexManager
         Collection<ByteBuffer> indexedColumnNames = indexesByColumn.keySet();
         for (ByteBuffer indexedColumn : indexedColumnNames)
         {
-            ColumnDefinition def = baseCfs.metadata.getColumn_metadata().get(indexedColumn);
+            ColumnDefinition def = baseCfs.metadata.getColumnDefinition(indexedColumn);
             if (def == null || def.getIndexType() == null)
                 removeIndexedColumn(indexedColumn);
         }
 
-        for (ColumnDefinition cdef : baseCfs.metadata.getColumn_metadata().values())
+        // TODO: allow all ColumnDefinition type
+        for (ColumnDefinition cdef : baseCfs.metadata.allColumns())
             if (cdef.getIndexType() != null && !indexedColumnNames.contains(cdef.name))
                 addIndexedColumn(cdef);
 
@@ -158,17 +160,22 @@ public class SecondaryIndexManager
 
     public boolean indexes(ByteBuffer name, Collection<SecondaryIndex> indexes)
     {
-        return indexFor(name, indexes) != null;
+        return !indexFor(name, indexes).isEmpty();
     }
 
-    public SecondaryIndex indexFor(ByteBuffer name, Collection<SecondaryIndex> indexes)
+    public List<SecondaryIndex> indexFor(ByteBuffer name, Collection<SecondaryIndex> indexes)
     {
+        List<SecondaryIndex> matching = null;
         for (SecondaryIndex index : indexes)
         {
             if (index.indexes(name))
-                return index;
+            {
+                if (matching == null)
+                    matching = new ArrayList<SecondaryIndex>();
+                matching.add(index);
+            }
         }
-        return null;
+        return matching == null ? Collections.<SecondaryIndex>emptyList() : matching;
     }
 
     public boolean indexes(Column column)
@@ -181,7 +188,7 @@ public class SecondaryIndexManager
         return indexes(name, indexesByColumn.values());
     }
 
-    public SecondaryIndex indexFor(ByteBuffer name)
+    public List<SecondaryIndex> indexFor(ByteBuffer name)
     {
         return indexFor(name, indexesByColumn.values());
     }
@@ -274,6 +281,9 @@ public class SecondaryIndexManager
         }
         else
         {
+            // TODO: We sould do better than throw a RuntimeException
+            if (cdef.getIndexType() == IndexType.CUSTOM && index instanceof AbstractSimplePerColumnSecondaryIndex)
+                throw new RuntimeException("Cannot use a subclass of AbstractSimplePerColumnSecondaryIndex as a CUSTOM index, as they assume they are CFS backed");
             index.init();
         }
 
@@ -600,22 +610,18 @@ public class SecondaryIndexManager
             if (column.isMarkedForDelete())
                 return;
 
-            SecondaryIndex index = indexFor(column.name());
-            if (index == null)
-                return;
-
-            ((PerColumnSecondaryIndex) index).insert(key.key, column);
+            for (SecondaryIndex index : indexFor(column.name()))
+                ((PerColumnSecondaryIndex) index).insert(key.key, column);
         }
 
         public void update(Column oldColumn, Column column)
         {
-            SecondaryIndex index = indexFor(column.name());
-            if (index == null)
-                return;
-
-            ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
-            if (!column.isMarkedForDelete())
-                ((PerColumnSecondaryIndex) index).insert(key.key, column);
+            for (SecondaryIndex index : indexFor(column.name()))
+            {
+                ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
+                if (!column.isMarkedForDelete())
+                    ((PerColumnSecondaryIndex) index).insert(key.key, column);
+            }
         }
 
         public void remove(Column column)
@@ -623,11 +629,8 @@ public class SecondaryIndexManager
             if (column.isMarkedForDelete())
                 return;
 
-            SecondaryIndex index = indexFor(column.name());
-            if (index == null)
-                return;
-
-            ((PerColumnSecondaryIndex) index).delete(key.key, column);
+            for (SecondaryIndex index : indexFor(column.name()))
+                ((PerColumnSecondaryIndex) index).delete(key.key, column);
         }
     }
 
@@ -646,37 +649,35 @@ public class SecondaryIndexManager
             if (column.isMarkedForDelete())
                 return;
 
-            SecondaryIndex index = indexFor(column.name());
-            if (index == null)
-                return;
-
-            if (index instanceof  PerColumnSecondaryIndex)
+            for (SecondaryIndex index : indexFor(column.name()))
             {
-                ((PerColumnSecondaryIndex) index).insert(key.key, column);
-            }
-            else
-            {
-                if (appliedRowLevelIndexes.add(index.getClass()))
-                    ((PerRowSecondaryIndex) index).index(key.key);
+                if (index instanceof  PerColumnSecondaryIndex)
+                {
+                    ((PerColumnSecondaryIndex) index).insert(key.key, column);
+                }
+                else
+                {
+                    if (appliedRowLevelIndexes.add(index.getClass()))
+                        ((PerRowSecondaryIndex) index).index(key.key);
+                }
             }
         }
 
         public void update(Column oldColumn, Column column)
         {
-            SecondaryIndex index = indexFor(column.name());
-            if (index == null)
-                return;
-
-            if (index instanceof  PerColumnSecondaryIndex)
+            for (SecondaryIndex index : indexFor(column.name()))
             {
-                ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
-                if (!column.isMarkedForDelete())
-                    ((PerColumnSecondaryIndex) index).insert(key.key, column);
-            }
-            else
-            {
-                if (appliedRowLevelIndexes.add(index.getClass()))
-                    ((PerRowSecondaryIndex) index).index(key.key);
+                if (index instanceof  PerColumnSecondaryIndex)
+                {
+                    ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
+                    if (!column.isMarkedForDelete())
+                        ((PerColumnSecondaryIndex) index).insert(key.key, column);
+                }
+                else
+                {
+                    if (appliedRowLevelIndexes.add(index.getClass()))
+                        ((PerRowSecondaryIndex) index).index(key.key);
+                }
             }
         }
 
@@ -685,18 +686,17 @@ public class SecondaryIndexManager
             if (column.isMarkedForDelete())
                 return;
 
-            SecondaryIndex index = indexFor(column.name());
-            if (index == null)
-                return;
-
-            if (index instanceof  PerColumnSecondaryIndex)
+            for (SecondaryIndex index : indexFor(column.name()))
             {
-                ((PerColumnSecondaryIndex) index).delete(key.key, column);
-            }
-            else
-            {
-                if (appliedRowLevelIndexes.add(index.getClass()))
-                    ((PerRowSecondaryIndex) index).index(key.key);
+                if (index instanceof  PerColumnSecondaryIndex)
+                {
+                    ((PerColumnSecondaryIndex) index).delete(key.key, column);
+                }
+                else
+                {
+                    if (appliedRowLevelIndexes.add(index.getClass()))
+                        ((PerRowSecondaryIndex) index).index(key.key);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 3085f48..16ac091 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
 
 public abstract class SecondaryIndexSearcher
 {
@@ -43,8 +44,11 @@ public abstract class SecondaryIndexSearcher
     /**
      * @return true this index is able to handle given clauses.
      */
-    public abstract boolean isIndexing(List<IndexExpression> clause);
-    
+    public boolean isIndexing(List<IndexExpression> clause)
+    {
+        return highestSelectivityPredicate(clause) != null;
+    }
+
     protected boolean isIndexValueStale(ColumnFamily liveData, ByteBuffer indexedColumnName, ByteBuffer indexedValue)
     {
         Column liveColumn = liveData.getColumn(indexedColumnName);
@@ -54,4 +58,27 @@ public abstract class SecondaryIndexSearcher
         ByteBuffer liveValue = liveColumn.value();
         return 0 != liveData.metadata().getValueValidator(indexedColumnName).compare(indexedValue, liveValue);
     }
+
+    protected IndexExpression highestSelectivityPredicate(List<IndexExpression> clause)
+    {
+        IndexExpression best = null;
+        int bestMeanCount = Integer.MAX_VALUE;
+        for (IndexExpression expression : clause)
+        {
+            //skip columns belonging to a different index type
+            if(!columns.contains(expression.column_name))
+                continue;
+
+            SecondaryIndex index = indexManager.getIndexForColumn(expression.column_name);
+            if (index == null || (expression.op != IndexOperator.EQ))
+                continue;
+            int columns = index.getIndexCfs().getMeanColumns();
+            if (columns < bestMeanCount)
+            {
+                best = expression;
+                bestMeanCount = columns;
+            }
+        }
+        return best;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 3d10ec5..d7302f7 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -18,92 +18,150 @@
 package org.apache.cassandra.db.index.composites;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 
 /**
- * Implements a secondary index for a column family using a second column family
- * in which the row keys are indexed values, and column names are base row keys.
+ * Base class for secondary indexes where composites are involved.
  */
-public class CompositesIndex extends AbstractSimplePerColumnSecondaryIndex
+public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIndex
 {
-    public static final String PREFIX_SIZE_OPTION = "prefix_size";
+    private volatile CompositeType indexComparator;
 
-    private CompositeType indexComparator;
-    private int prefixSize;
-
-    public void init(ColumnDefinition columnDef)
+    protected CompositeType getIndexComparator()
     {
-        assert baseCfs.getComparator() instanceof CompositeType;
-
-        try
+        // Yes, this is racy, but doing this more than once is not a big deal, we just want to avoid doing it every time
+        // More seriously, we should fix that whole SecondaryIndex API so this can be a final and avoid all that non-sense.
+        if (indexComparator == null)
         {
-            prefixSize = Integer.parseInt(columnDef.getIndexOptions().get(PREFIX_SIZE_OPTION));
+            assert columnDef != null;
+            indexComparator = getIndexComparator(baseCfs.metadata, columnDef);
         }
-        catch (NumberFormatException e)
+        return indexComparator;
+    }
+
+    public static CompositesIndex create(ColumnDefinition cfDef)
+    {
+        switch (cfDef.type)
         {
-            // Shouldn't happen since validateOptions must have been called
-            throw new AssertionError(e);
+            case CLUSTERING_KEY:
+                return new CompositesIndexOnClusteringKey();
+            case REGULAR:
+                return new CompositesIndexOnRegular();
+            case PARTITION_KEY:
+                return new CompositesIndexOnPartitionKey();
+            //case COMPACT_VALUE:
+            //    return new CompositesIndexOnCompactValue();
         }
+        throw new AssertionError();
+    }
 
-        indexComparator = (CompositeType)SecondaryIndex.getIndexComparator(baseCfs.metadata, columnDef);
+    // Check SecondaryIndex.getIndexComparator if you want to know why this is static
+    public static CompositeType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cfDef)
+    {
+        switch (cfDef.type)
+        {
+            case CLUSTERING_KEY:
+                return CompositesIndexOnClusteringKey.buildIndexComparator(baseMetadata, cfDef);
+            case REGULAR:
+                return CompositesIndexOnRegular.buildIndexComparator(baseMetadata, cfDef);
+            case PARTITION_KEY:
+                return CompositesIndexOnPartitionKey.buildIndexComparator(baseMetadata, cfDef);
+            //case COMPACT_VALUE:
+            //    return CompositesIndexOnCompactValue.buildIndexComparator(baseMetadata, cfDef);
+        }
+        throw new AssertionError();
     }
 
     protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column)
     {
-        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
-        ByteBuffer[] components = baseComparator.split(column.name());
-        CompositeType.Builder builder = new CompositeType.Builder(indexComparator);
-        builder.add(rowKey);
-        for (int i = 0; i < Math.min(prefixSize, components.length); i++)
-            builder.add(components[i]);
-        return builder.build();
+        return makeIndexColumnNameBuilder(rowKey, column.name()).build();
+    }
+
+    protected abstract ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName);
+
+    public abstract IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry);
+
+    public abstract boolean isStale(IndexedEntry entry, ColumnFamily data);
+
+    public void delete(IndexedEntry entry)
+    {
+        int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
+        ColumnFamily cfi = ArrayBackedSortedColumns.factory.create(indexCfs.metadata);
+        cfi.addTombstone(entry.indexEntry, (int) (System.currentTimeMillis() / 1000), entry.timestamp);
+        indexCfs.apply(entry.indexValue, cfi, SecondaryIndexManager.nullUpdater);
+        if (logger.isDebugEnabled())
+            logger.debug("removed index entry for cleaned-up value {}:{}", entry.indexValue, cfi);
+
     }
 
     protected AbstractType getExpressionComparator()
     {
-        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
-        return baseComparator.types.get(prefixSize);
+        return baseCfs.metadata.getColumnDefinitionComparator(columnDef);
     }
 
-    @Override
-    public boolean indexes(ByteBuffer name)
+    protected CompositeType getBaseComparator()
     {
-        ColumnDefinition columnDef = columnDefs.iterator().next();
-        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
-        ByteBuffer[] components = baseComparator.split(name);
-        AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
-        return components.length > columnDef.componentIndex
-            && comp.compare(components[columnDef.componentIndex], columnDef.name) == 0;
+        assert baseCfs.getComparator() instanceof CompositeType;
+        return (CompositeType)baseCfs.getComparator();
     }
 
     public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
     {
-        return new CompositesSearcher(baseCfs.indexManager, columns, prefixSize);
+        return new CompositesSearcher(baseCfs.indexManager, columns);
     }
 
     public void validateOptions() throws ConfigurationException
     {
         ColumnDefinition columnDef = columnDefs.iterator().next();
-        String option = columnDef.getIndexOptions().get(PREFIX_SIZE_OPTION);
+        Map<String, String> options = new HashMap<String, String>(columnDef.getIndexOptions());
+
+        // We take no options though we used to have one called "prefix_size",
+        // so skip it silently for backward compatibility sake.
+        options.remove("prefix_size");
+
+        if (!options.isEmpty())
+            throw new ConfigurationException("Unknown options provided for COMPOSITES index: " + options.keySet());
+    }
+
+    public class IndexedEntry
+    {
+        public final DecoratedKey indexValue;
+        public final ByteBuffer indexEntry;
+        public final long timestamp;
 
-        if (option == null)
-            throw new ConfigurationException("Missing option " + PREFIX_SIZE_OPTION);
+        public final ByteBuffer indexedKey;
+        public final ColumnNameBuilder indexedEntryNameBuilder;
 
-        try
+        public IndexedEntry(DecoratedKey indexValue, ByteBuffer indexEntry, long timestamp, ByteBuffer indexedKey, ColumnNameBuilder indexedEntryNameBuilder)
         {
-            Integer.parseInt(option);
+            this.indexValue = indexValue;
+            this.indexEntry = indexEntry;
+            this.timestamp = timestamp;
+            this.indexedKey = indexedKey;
+            this.indexedEntryNameBuilder = indexedEntryNameBuilder;
         }
-        catch (NumberFormatException e)
+
+        public ByteBuffer indexedEntryStart()
+        {
+            return indexedEntryNameBuilder.build();
+        }
+
+        public ByteBuffer indexedEntryEnd()
         {
-            throw new ConfigurationException(String.format("Invalid non integer value for option %s (got '%s')", PREFIX_SIZE_OPTION, option));
+            return indexedEntryNameBuilder.buildAsEndOfRange();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
new file mode 100644
index 0000000..f1df078
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index.composites;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnNameBuilder;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
+import org.apache.cassandra.db.marshal.*;
+
+/**
+ * Index on a CLUSTERING_KEY column definition.
+ *
+ * A cell indexed by this index will have the general form:
+ *   ck_0 ... ck_n c_name : v
+ * where ck_i are the cluster keys, c_name the last component of the cell
+ * composite name (or second to last if collections are in use, but this
+ * has no impact) and v the cell value.
+ *
+ * Such a cell is always indexed by this index (or rather, it is indexed if
+ * n >= columnDef.componentIndex, which will always be the case in practice)
+ * and it will generate (makeIndexColumnName()) an index entry whose:
+ *   - row key will be ck_i (getIndexedValue()) where i == columnDef.componentIndex.
+ *   - cell name will
+ *       rk ck_0 ... ck_{i-1} ck_{i+1} ck_n
+ *     where rk is the row key of the initial cell and i == columnDef.componentIndex.
+ */
+public class CompositesIndexOnClusteringKey extends CompositesIndex
+{
+    public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    {
+        // Index cell names are rk ck_0 ... ck_{i-1} ck_{i+1} ck_n, so n
+        // components total (where n is the number of clustering keys)
+        int ckCount = baseMetadata.clusteringKeyColumns().size();
+        List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(ckCount);
+        List<AbstractType<?>> ckTypes = baseMetadata.comparator.getComponents();
+        types.add(SecondaryIndex.keyComparator);
+        for (int i = 0; i < columnDef.componentIndex; i++)
+            types.add(ckTypes.get(i));
+        for (int i = columnDef.componentIndex + 1; i < ckCount; i++)
+            types.add(ckTypes.get(i));
+        return CompositeType.getInstance(types);
+    }
+
+    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
+    {
+        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+        ByteBuffer[] components = baseComparator.split(column.name());
+        return components[columnDef.componentIndex];
+    }
+
+    protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+    {
+        int ckCount = baseCfs.metadata.clusteringKeyColumns().size();
+        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+        ByteBuffer[] components = baseComparator.split(columnName);
+        CompositeType.Builder builder = getIndexComparator().builder();
+        builder.add(rowKey);
+        for (int i = 0; i < columnDef.componentIndex; i++)
+            builder.add(components[i]);
+        for (int i = columnDef.componentIndex + 1; i < ckCount; i++)
+            builder.add(components[i]);
+        return builder;
+    }
+
+    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
+    {
+        int ckCount = baseCfs.metadata.clusteringKeyColumns().size();
+        ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
+
+        ColumnNameBuilder builder = getBaseComparator().builder();
+        for (int i = 0; i < columnDef.componentIndex; i++)
+            builder.add(components[i + 1]);
+
+        builder.add(indexedValue.key);
+
+        for (int i = columnDef.componentIndex + 1; i < ckCount; i++)
+            builder.add(components[i]);
+
+        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);
+    }
+
+    @Override
+    public boolean indexes(ByteBuffer name)
+    {
+        // For now, assume this is only used in CQL3 when we know name has enough component.
+        return true;
+    }
+
+    public boolean isStale(IndexedEntry entry, ColumnFamily data)
+    {
+        return data == null || data.hasOnlyTombstones();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
new file mode 100644
index 0000000..fce7700
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index.composites;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnNameBuilder;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
+import org.apache.cassandra.db.marshal.*;
+
+/**
+ * Index on a PARTITION_KEY column definition.
+ *
+ * This suppose a composite row key:
+ *   rk = rk_0 ... rk_n
+ *
+ * The corresponding index entry will be:
+ *   - index row key will be rk_i (where i == columnDef.componentIndex)
+ *   - cell name will be: rk ck
+ *     where rk is the fully partition key and ck the clustering keys of the
+ *     original cell names (thus excluding the last column name as we want to refer to
+ *     the whole CQL3 row, not just the cell itself)
+ *
+ * Note that contrarily to other type of index, we repeat the indexed value in
+ * the index cell name (we use the whole partition key). The reason is that we
+ * want to order the index cell name by partitioner first, and skipping a part
+ * of the row key would change the order.
+ */
+public class CompositesIndexOnPartitionKey extends CompositesIndex
+{
+    public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    {
+        int ckCount = baseMetadata.clusteringKeyColumns().size();
+        List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(ckCount + 1);
+        types.add(SecondaryIndex.keyComparator);
+        types.addAll(baseMetadata.comparator.getComponents());
+        return CompositeType.getInstance(types);
+    }
+
+    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
+    {
+        CompositeType keyComparator = (CompositeType)baseCfs.metadata.getKeyValidator();
+        ByteBuffer[] components = keyComparator.split(rowKey);
+        return components[columnDef.componentIndex];
+    }
+
+    protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+    {
+        int ckCount = baseCfs.metadata.clusteringKeyColumns().size();
+        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+        ByteBuffer[] components = baseComparator.split(columnName);
+        CompositeType.Builder builder = getIndexComparator().builder();
+        builder.add(rowKey);
+        for (int i = 0; i < ckCount; i++)
+            builder.add(components[i]);
+        return builder;
+    }
+
+    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
+    {
+        int ckCount = baseCfs.metadata.clusteringKeyColumns().size();
+        ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
+
+        ColumnNameBuilder builder = getBaseComparator().builder();
+        for (int i = 0; i < ckCount; i++)
+            builder.add(components[i + 1]);
+
+        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);
+    }
+
+    @Override
+    public boolean indexes(ByteBuffer name)
+    {
+        // Since a partition key is always full, we always index it
+        return true;
+    }
+
+    public boolean isStale(IndexedEntry entry, ColumnFamily data)
+    {
+        return data == null || data.hasOnlyTombstones();
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a950b925/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
new file mode 100644
index 0000000..03649c0
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index.composites;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnNameBuilder;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
+import org.apache.cassandra.db.marshal.*;
+
+/**
+ * Index on a REGULAR column definition on a composite type.
+ *
+ * A cell indexed by this index will have the general form:
+ *   ck_0 ... ck_n c_name : v
+ * where ck_i are the cluster keys, c_name the last component of the cell
+ * composite name (or second to last if collections are in use, but this
+ * has no impact) and v the cell value.
+ *
+ * Such a cell is indexed if c_name == columnDef.name, and it will generate
+ * (makeIndexColumnName()) an index entry whose:
+ *   - row key will be the value v (getIndexedValue()).
+ *   - cell name will
+ *       rk ck_0 ... ck_n
+ *     where rk is the row key of the initial cell. I.e. the index entry store
+ *     all the information require to locate back the indexed cell.
+ */
+public class CompositesIndexOnRegular extends CompositesIndex
+{
+    public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    {
+        int prefixSize = columnDef.componentIndex;
+        List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 1);
+        types.add(SecondaryIndex.keyComparator);
+        for (int i = 0; i < prefixSize; i++)
+            types.add(((CompositeType)baseMetadata.comparator).types.get(i));
+        return CompositeType.getInstance(types);
+    }
+
+    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
+    {
+        return column.value();
+    }
+
+    protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+    {
+        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+        ByteBuffer[] components = baseComparator.split(columnName);
+        CompositeType.Builder builder = getIndexComparator().builder();
+        builder.add(rowKey);
+        for (int i = 0; i < Math.min(columnDef.componentIndex, components.length); i++)
+            builder.add(components[i]);
+        return builder;
+    }
+
+    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
+    {
+        ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
+        CompositeType.Builder builder = getBaseComparator().builder();
+        for (int i = 0; i < columnDef.componentIndex; i++)
+            builder.add(components[i + 1]);
+        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);
+    }
+
+    @Override
+    public boolean indexes(ByteBuffer name)
+    {
+        ByteBuffer[] components = getBaseComparator().split(name);
+        AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
+        return components.length > columnDef.componentIndex
+            && comp.compare(components[columnDef.componentIndex], columnDef.name) == 0;
+    }
+
+    public boolean isStale(IndexedEntry entry, ColumnFamily data)
+    {
+        ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).build();
+        Column liveColumn = data.getColumn(bb);
+        if (liveColumn == null || liveColumn.isMarkedForDelete())
+            return true;
+
+        ByteBuffer liveValue = liveColumn.value();
+        return columnDef.getValidator().compare(entry.indexValue.key, liveValue) != 0;
+    }
+}


Mime
View raw message