cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [2/2] Composite secondary indexes (for CQL3)
Date Thu, 09 Aug 2012 17:07:04 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
new file mode 100644
index 0000000..6ffb63e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index.composites;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompositesSearcher extends SecondaryIndexSearcher
+{
+    private static final Logger logger = LoggerFactory.getLogger(CompositesSearcher.class);
+
+    private final int prefixSize;
+
+    public CompositesSearcher(SecondaryIndexManager indexManager, Set<ByteBuffer> columns,
int prefixSize)
+    {
+        super(indexManager, columns);
+        this.prefixSize = prefixSize;
+    }
+
+    private IndexExpression highestSelectivityPredicate(List<IndexExpression> clause)
+    {
+        IndexExpression best = null;
+        int bestMeanCount = Integer.MAX_VALUE;
+        for (IndexExpression expression : clause)
+        {
+            //skip columns belonging to a different index type
+            if(!columns.contains(expression.column_name))
+                continue;
+
+            SecondaryIndex index = indexManager.getIndexForColumn(expression.column_name);
+            if (index == null || (expression.op != IndexOperator.EQ))
+                continue;
+            int columns = index.getIndexCfs().getMeanColumns();
+            if (columns < bestMeanCount)
+            {
+                best = expression;
+                bestMeanCount = columns;
+            }
+        }
+        return best;
+    }
+
+    public boolean isIndexing(List<IndexExpression> clause)
+    {
+        return highestSelectivityPredicate(clause) != null;
+    }
+
+    @Override
+    public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition>
range, int maxResults, IFilter dataFilter, boolean maxIsColumns)
+    {
+        assert clause != null && !clause.isEmpty();
+        ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults,
maxIsColumns, false);
+        return baseCfs.filter(getIndexedIterator(range, filter), filter);
+    }
+
+    public ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition>
range, final ExtendedFilter filter)
+    {
+        // Start with the most-restrictive indexed clause, then apply remaining clauses
+        // to each row matching that clause.
+        // TODO: allow merge join instead of just one index + loop
+        final IndexExpression primary = highestSelectivityPredicate(filter.getClause());
+        final SecondaryIndex index = indexManager.getIndexForColumn(primary.column_name);
+        assert index != null;
+        final DecoratedKey indexKey = index.getIndexKeyFor(primary.value);
+
+        /*
+         * XXX: If the range requested is a token range, we'll have to start at the beginning
(and stop at the end) of
+         * the indexed row unfortunately (which will be inefficient), because we have not
way to intuit the small
+         * possible key having a given token. A fix would be to actually store the token
along the key in the
+         * indexed row.
+         */
+        ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key
: ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key
: ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
+        final CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+        final CompositeType indexComparator = (CompositeType)index.getIndexCfs().getComparator();
+
+        CompositeType.Builder builder = null;
+        if (startKey.remaining() > 0)
+        {
+            builder = indexComparator.builder().add(startKey);
+            // For names filter, we have no choice but to query from the beginning of the
key. This can be highly inefficient however.
+            if (filter.originalFilter() instanceof SliceQueryFilter)
+            {
+                ByteBuffer[] components = baseComparator.split(((SliceQueryFilter)filter.originalFilter()).start());
+                for (int i = 0; i < Math.min(prefixSize, components.length); ++i)
+                    builder.add(components[i]);
+            }
+        }
+        final ByteBuffer startPrefix = startKey.remaining() == 0 ? ByteBufferUtil.EMPTY_BYTE_BUFFER
: builder.build();
+
+        if (endKey.remaining() > 0)
+        {
+            builder = indexComparator.builder().add(endKey);
+            // For names filter, we have no choice but to query until the end of the key.
This can be highly inefficient however.
+            if (filter.originalFilter() instanceof SliceQueryFilter)
+            {
+                ByteBuffer[] components = baseComparator.split(((SliceQueryFilter)filter.originalFilter()).finish());
+                for (int i = 0; i < Math.min(prefixSize, components.length); ++i)
+                    builder.add(components[i]);
+            }
+        }
+        final ByteBuffer endPrefix = endKey.remaining() == 0 ? ByteBufferUtil.EMPTY_BYTE_BUFFER
: builder.buildAsEndOfRange();
+
+        // We will need to filter clustering keys based on the user filter. If
+        // it is a names filter, we are really interested on the clustering
+        // part, not the actual column name (NOTE: this is a hack that assumes CQL3).
+        final SliceQueryFilter originalFilter;
+        if (filter.originalFilter() instanceof SliceQueryFilter)
+        {
+            originalFilter = (SliceQueryFilter)filter.originalFilter();
+        }
+        else
+        {
+            ByteBuffer first = ((NamesQueryFilter)filter.originalFilter()).columns.iterator().next();
+            ByteBuffer[] components = baseComparator.split(first);
+            builder = baseComparator.builder();
+            // All all except the last component, since it's the column name
+            for (int i = 0; i < components.length - 1; i++)
+                builder.add(components[i]);
+            originalFilter = new SliceQueryFilter(builder.copy().build(), builder.copy().buildAsEndOfRange(),
false, Integer.MAX_VALUE);
+        }
+
+        return new ColumnFamilyStore.AbstractScanIterator()
+        {
+            private ByteBuffer lastSeenPrefix = startPrefix;
+            private Deque<IColumn> indexColumns;
+            private final QueryPath path = new QueryPath(baseCfs.columnFamily);
+            private int columnsRead = Integer.MAX_VALUE;
+
+            private final int meanColumns = Math.max(index.getIndexCfs().getMeanColumns(),
1);
+            // We shouldn't fetch only 1 row as this provides buggy paging in case the first
row doesn't satisfy all clauses
+            private final int rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns()
/ meanColumns), 2);
+
+            public boolean needsFiltering()
+            {
+                return false;
+            }
+
+            private Row makeReturn(DecoratedKey key, ColumnFamily data)
+            {
+                if (data == null)
+                {
+                    return endOfData();
+                }
+                else
+                {
+                    assert key != null;
+                    return new Row(key, data);
+                }
+            }
+
+            protected Row computeNext()
+            {
+                /*
+                 * Our internal index code is wired toward internal rows. So we need to acumulate
all results for a given
+                 * row before returning from this method. Which unfortunately means that
this method has to do what
+                 * CFS.filter does for KeysIndex.
+                 */
+                DecoratedKey currentKey = null;
+                ColumnFamily data = null;
+                int columnsCount = 0;
+                int limit = ((SliceQueryFilter)filter.initialFilter()).count;
+
+                while (true)
+                {
+                    // Did we got more columns that needed to respect the user limit?
+                    // (but we still need to return was fetch already)
+                    if (columnsCount > limit)
+                        return makeReturn(currentKey, data);
+
+                    if (indexColumns == null || indexColumns.isEmpty())
+                    {
+                        if (columnsRead < rowsPerQuery)
+                        {
+                            logger.debug("Read only {} (< {}) last page through, must
be done", columnsRead, rowsPerQuery);
+                            return makeReturn(currentKey, data);
+                        }
+
+                        // TODO: broken because we need to extract the component comparator
rather than the whole name comparator
+                        // if (logger.isDebugEnabled())
+                        //     logger.debug("Scanning index {} starting with {}",
+                        //                  expressionString(primary), indexComparator.getString(startPrefix));
+
+                        QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
+                                                                             new QueryPath(index.getIndexCfs().getColumnFamilyName()),
+                                                                             lastSeenPrefix,
+                                                                             endPrefix,
+                                                                             false,
+                                                                             rowsPerQuery);
+                        ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
+                        if (indexRow == null)
+                            return makeReturn(currentKey, data);
+
+                        Collection<IColumn> sortedColumns = indexRow.getSortedColumns();
+                        columnsRead = sortedColumns.size();
+                        indexColumns = new ArrayDeque(sortedColumns);
+                        IColumn firstColumn = sortedColumns.iterator().next();
+
+                        // Paging is racy, so it is possible the first column of a page is
not the last seen one.
+                        if (lastSeenPrefix != startPrefix && lastSeenPrefix.equals(firstColumn.name()))
+                        {
+                            // skip the row we already saw w/ the last page of results
+                            indexColumns.poll();
+                            logger.debug("Skipping {}", indexComparator.getString(firstColumn.name()));
+                        }
+                        else if (range instanceof Range && !indexColumns.isEmpty()
&& firstColumn.name().equals(startPrefix))
+                        {
+                            // skip key excluded by range
+                            indexColumns.poll();
+                            logger.debug("Skipping first key as range excludes it");
+                        }
+                    }
+
+                    while (!indexColumns.isEmpty() && columnsCount <= limit)
+                    {
+                        IColumn column = indexColumns.poll();
+                        lastSeenPrefix = column.name();
+                        if (column.isMarkedForDelete())
+                        {
+                            logger.debug("skipping {}", column.name());
+                            continue;
+                        }
+
+                        ByteBuffer[] components = indexComparator.split(lastSeenPrefix);
+                        DecoratedKey dk = baseCfs.partitioner.decorateKey(components[0]);
+
+                        // Are we done for this row?
+                        if (currentKey == null)
+                        {
+                            currentKey = dk;
+                        }
+                        else if (!currentKey.equals(dk))
+                        {
+                            DecoratedKey previousKey = currentKey;
+                            currentKey = dk;
+
+                            // We're done with the previous row, return it if it had data,
continue otherwise
+                            indexColumns.addFirst(column);
+                            if (data == null)
+                                continue;
+                            else
+                                return makeReturn(previousKey, data);
+                        }
+
+                        if (!range.right.isMinimum(baseCfs.partitioner) && range.right.compareTo(dk)
< 0)
+                        {
+                            logger.debug("Reached end of assigned scan range");
+                            return endOfData();
+                        }
+                        if (!range.contains(dk))
+                        {
+                            logger.debug("Skipping entry {} outside of assigned scan range",
dk.token);
+                            continue;
+                        }
+
+                        logger.debug("Adding index hit to current row for {}", indexComparator.getString(lastSeenPrefix));
+                        // For sparse composites, we're good querying the whole logical row
+                        // Obviously if this index is used for other usage, that might be
inefficient
+                        CompositeType.Builder builder = baseComparator.builder();
+                        for (int i = 0; i < prefixSize; i++)
+                            builder.add(components[i + 1]);
+
+                        // Does this "row" match the user original filter
+                        ByteBuffer start = builder.copy().build();
+                        if (!originalFilter.includes(baseComparator, start))
+                            continue;
+
+                        SliceQueryFilter dataFilter = new SliceQueryFilter(start, builder.copy().buildAsEndOfRange(),
false, Integer.MAX_VALUE);
+                        ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk,
path, dataFilter));
+                        if (newData != null)
+                        {
+                            if (!filter.isSatisfiedBy(newData, builder))
+                                continue;
+
+                            if (data == null)
+                                data = ColumnFamily.create(baseCfs.metadata);
+                            data.resolve(newData);
+                            columnsCount += newData.getLiveColumnCount();
+                        }
+                    }
+                 }
+             }
+
+            public void close() throws IOException {}
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index 7147bed..e9c805d 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -21,14 +21,11 @@ import java.nio.ByteBuffer;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.index.PerColumnSecondaryIndex;
+import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
@@ -41,124 +38,16 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  * Implements a secondary index for a column family using a second column family
  * in which the row keys are indexed values, and column names are base row keys.
  */
-public class KeysIndex extends PerColumnSecondaryIndex
+public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
 {
-    private static final Logger logger = LoggerFactory.getLogger(KeysIndex.class);
-    private ColumnFamilyStore indexCfs;
-
-    public KeysIndex()
-    {
-    }
-
-    public void init()
-    {
-        assert baseCfs != null && columnDefs != null;
-
-        ColumnDefinition columnDef = columnDefs.iterator().next();
-        CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef,
indexComparator());
-        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.table,
-                                                             indexedCfMetadata.cfName,
-                                                             new LocalPartitioner(columnDef.getValidator()),
-                                                             indexedCfMetadata);
-
-        // enable and initialize row cache based on parent's setting and indexed column's
cardinality
-        CFMetaData.Caching baseCaching = baseCfs.metadata.getCaching();
-        if (baseCaching == CFMetaData.Caching.ALL || baseCaching == CFMetaData.Caching.ROWS_ONLY)
-        {
-            /*
-             * # of index CF's key = cardinality of indexed column.
-             * if # of keys stored in index CF is more than average column counts (means
tall table),
-             * then consider it as high cardinality.
-             */
-            double estimatedKeys = indexCfs.estimateKeys();
-            double averageColumnCount = indexCfs.getMeanColumns();
-            if (averageColumnCount > 0 && estimatedKeys / averageColumnCount >
1)
-            {
-                logger.debug("turning row cache on for " + indexCfs.getColumnFamilyName());
-                indexCfs.metadata.caching(baseCaching);
-                indexCfs.initRowCache();
-            }
-        }
-    }
-
-    public static AbstractType<?> indexComparator()
+    public void init(ColumnDefinition columnDef)
     {
-        IPartitioner rowPartitioner = StorageService.getPartitioner();
-        return (rowPartitioner instanceof OrderPreservingPartitioner || rowPartitioner instanceof
ByteOrderedPartitioner)
-               ? BytesType.instance
-               : new LocalByPartionerType(StorageService.getPartitioner());
+        // Nothing specific
     }
 
-    public void deleteColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn column)
+    protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, IColumn column)
     {
-        if (column.isMarkedForDelete())
-            return;
-
-        int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
-        ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata);
-        cfi.addTombstone(rowKey, localDeletionTime, column.timestamp());
-        indexCfs.apply(valueKey, cfi);
-        if (logger.isDebugEnabled())
-            logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, cfi);
-    }
-
-    public void insertColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn column)
-    {
-        ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata);
-        if (column instanceof ExpiringColumn)
-        {
-            ExpiringColumn ec = (ExpiringColumn)column;
-            cfi.addColumn(new ExpiringColumn(rowKey, ByteBufferUtil.EMPTY_BYTE_BUFFER, ec.timestamp(),
ec.getTimeToLive(), ec.getLocalDeletionTime()));
-        }
-        else
-        {
-            cfi.addColumn(new Column(rowKey, ByteBufferUtil.EMPTY_BYTE_BUFFER, column.timestamp()));
-        }
-        if (logger.isDebugEnabled())
-            logger.debug("applying index row {} in {}", indexCfs.metadata.getKeyValidator().getString(valueKey.key),
cfi);
-
-        indexCfs.apply(valueKey, cfi);
-    }
-
-    public void updateColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col)
-    {
-        insertColumn(valueKey, rowKey, col);
-    }
-
-    public void removeIndex(ByteBuffer columnName)
-    {
-        indexCfs.invalidate();
-    }
-
-    public void forceBlockingFlush()
-    {
-        try
-        {
-            indexCfs.forceBlockingFlush();
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-    }
-
-    public void invalidate()
-    {
-        indexCfs.invalidate();
-    }
-
-    public void truncate(long truncatedAt)
-    {
-        indexCfs.discardSSTables(truncatedAt);
-    }
-
-    public ColumnFamilyStore getIndexCfs()
-    {
-       return indexCfs;
+        return rowKey;
     }
 
     public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
@@ -166,18 +55,8 @@ public class KeysIndex extends PerColumnSecondaryIndex
         return new KeysSearcher(baseCfs.indexManager, columns);
     }
 
-    public String getIndexName()
-    {
-        return indexCfs.columnFamily;
-    }
-
     public void validateOptions() throws ConfigurationException
     {
         // no options used
     }
-
-    public long getLiveSize()
-    {
-        return indexCfs.getMemtableDataSize();
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index b95fdd5..f18e041 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -98,7 +98,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
         if (logger.isDebugEnabled())
             logger.debug("Primary scan clause is " + baseCfs.getComparator().getString(primary.column_name));
         assert index != null;
-        final DecoratedKey indexKey = indexManager.getIndexKeyFor(primary.column_name, primary.value);
+        final DecoratedKey indexKey = index.getIndexKeyFor(primary.value);
 
         /*
          * XXX: If the range requested is a token range, we'll have to start at the beginning
(and stop at the end) of

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index a834d94..83603ea 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -116,23 +116,13 @@ public class CompositeType extends AbstractCompositeType
     {
         assert objects.length == types.size();
 
-        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.length);
-        int totalLength = 0;
+        ByteBuffer[] serialized = new ByteBuffer[objects.length];
         for (int i = 0; i < objects.length; i++)
         {
             ByteBuffer buffer = ((AbstractType) types.get(i)).decompose(objects[i]);
-            serialized.add(buffer);
-            totalLength += 2 + buffer.remaining() + 1;
-        }
-        ByteBuffer out = ByteBuffer.allocate(totalLength);
-        for (ByteBuffer bb : serialized)
-        {
-            putShortLength(out, bb.remaining());
-            out.put(bb);
-            out.put((byte) 0);
+            serialized[i] = buffer;
         }
-        out.flip();
-        return out;
+        return build(serialized);
     }
 
     @Override
@@ -194,6 +184,28 @@ public class CompositeType extends AbstractCompositeType
         return getClass().getName() + TypeParser.stringifyTypeParameters(types);
     }
 
+    public Builder builder()
+    {
+        return new Builder(this);
+    }
+
+    public ByteBuffer build(ByteBuffer... buffers)
+    {
+        int totalLength = 0;
+        for (ByteBuffer bb : buffers)
+            totalLength += 2 + bb.remaining() + 1;
+
+        ByteBuffer out = ByteBuffer.allocate(totalLength);
+        for (ByteBuffer bb : buffers)
+        {
+            putShortLength(out, bb.remaining());
+            out.put(bb);
+            out.put((byte) 0);
+        }
+        out.flip();
+        return out;
+    }
+
     public static class Builder implements ColumnNameBuilder
     {
         private final CompositeType composite;
@@ -298,9 +310,6 @@ public class CompositeType extends AbstractCompositeType
 
         public ByteBuffer buildAsEndOfRange()
         {
-            if (components.size() >= composite.types.size())
-                throw new IllegalStateException("Composite column is already fully constructed");
-
             if (components.isEmpty())
                 return ByteBufferUtil.EMPTY_BYTE_BUFFER;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 381b934..c578892 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.index.keys.KeysIndex;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -121,7 +122,7 @@ public class SSTableReader extends SSTable
             String parentName = descriptor.cfname.substring(0, i);
             CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
             ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i
+ 1));
-            metadata = CFMetaData.newIndexMetadata(parent, def, KeysIndex.indexComparator());
+            metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent,
def));
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index 2c812e5..cfa7d0b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -203,7 +203,7 @@ public class StreamInSession extends AbstractStreamSession
                     if (entry.getKey() != null)
                     {
                         entry.getKey().addSSTables(entry.getValue());
-                        entry.getKey().indexManager.maybeBuildSecondaryIndexes(entry.getValue(),
entry.getKey().indexManager.getIndexedColumns());
+                        entry.getKey().indexManager.maybeBuildSecondaryIndexes(entry.getValue(),
entry.getKey().indexManager.allIndexesNames());
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index 527d02e..0b16eef 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.dht.IPartitioner;
@@ -536,7 +537,7 @@ public class ThriftValidation
             // no filter to apply
             return false;
 
-        Set<ByteBuffer> indexedColumns = Table.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.getIndexedColumns();
+        SecondaryIndexManager idxManager = Table.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager;
         AbstractType<?> nameValidator =  ColumnFamily.getComparatorFor(metadata.ksName,
metadata.cfName, null);
 
         boolean isIndexed = false;
@@ -567,7 +568,7 @@ public class ThriftValidation
                                                                 me.getMessage()));
             }
 
-            isIndexed |= (expression.op == IndexOperator.EQ) && indexedColumns.contains(expression.column_name);
+            isIndexed |= (expression.op == IndexOperator.EQ) && idxManager.indexes(expression.column_name);
         }
 
         return isIndexed;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/test/unit/org/apache/cassandra/config/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DefsTest.java b/test/unit/org/apache/cassandra/config/DefsTest.java
index ba4c20f..17595aa 100644
--- a/test/unit/org/apache/cassandra/config/DefsTest.java
+++ b/test/unit/org/apache/cassandra/config/DefsTest.java
@@ -523,7 +523,7 @@ public class DefsTest extends SchemaLoader
         rm.apply();
         ColumnFamilyStore cfs = Table.open("Keyspace6").getColumnFamilyStore("Indexed1");
         cfs.forceBlockingFlush();
-        ColumnFamilyStore indexedCfs = cfs.indexManager.getIndexForColumn(cfs.indexManager.getIndexedColumns().iterator().next()).getIndexCfs();
+        ColumnFamilyStore indexedCfs = cfs.indexManager.getIndexForColumn(ByteBufferUtil.bytes("birthdate")).getIndexCfs();
         Descriptor desc = indexedCfs.getSSTables().iterator().next().descriptor;
 
         // drop the index
@@ -534,7 +534,7 @@ public class DefsTest extends SchemaLoader
         MigrationManager.announceColumnFamilyUpdate(meta);
 
         // check
-        assert cfs.indexManager.getIndexedColumns().isEmpty();
+        assert cfs.indexManager.getIndexes().isEmpty();
         SSTableDeletingTask.waitForDeletions();
         assert !new File(desc.filenameFor(Component.DATA)).exists();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 94e28c9..ec74ae3 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -96,7 +96,6 @@ public class CleanupTest extends SchemaLoader
     {
         Table table = Table.open(TABLE1);
         ColumnFamilyStore cfs = table.getColumnFamilyStore(CF1);
-        assertEquals(cfs.indexManager.getIndexedColumns().iterator().next(), COLUMN);
 
         List<Row> rows;
 


Mime
View raw message