cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xe...@apache.org
Subject svn commit: r1169861 - in /cassandra/branches/cassandra-1.0.0: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/db/index/ src/java/org/apache/cassandra/db/index/keys/ src/java/org/apache/ca...
Date Mon, 12 Sep 2011 18:41:31 GMT
Author: xedin
Date: Mon Sep 12 18:41:31 2011
New Revision: 1169861

URL: http://svn.apache.org/viewvc?rev=1169861&view=rev
Log:
Add RowLevel support to secondary index API
patch by Jason Rutherglen; reviewed by Pavel Yaskevich for CASSANDRA-3147

Added:
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
Modified:
    cassandra/branches/cassandra-1.0.0/CHANGES.txt
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/Table.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java
    cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/db/CleanupTest.java
    cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/CHANGES.txt?rev=1169861&r1=1169860&r2=1169861&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0.0/CHANGES.txt Mon Sep 12 18:41:31 2011
@@ -69,7 +69,7 @@
  * rename CQL type names to match expected SQL behavior (CASSANDRA-3149, 3031)
  * Arena-based allocation for memtables (CASSANDRA-2252, 3162, 3163, 3168)
  * Default RR chance to 0.1 (CASSANDRA-3169)
-
+ * Add RowLevel support to secondary index API (CASSANDRA-3147)
 
 0.8.6
  * avoid trying to watch cassandra-topology.properties when loaded from jar

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1169861&r1=1169860&r2=1169861&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Sep 12 18:41:31 2011
@@ -159,7 +159,7 @@ public class ColumnFamilyStore implement
     /** ops count last time we computed liveRatio */
     private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
 
-    public void reload()
+    public void reload() throws IOException
     {
         // metadata object has been mutated directly. make all the members jibe with new settings.
 
@@ -488,7 +488,7 @@ public class ColumnFamilyStore implement
      * @param ksName The keyspace name
      * @param cfName The columnFamily name
      */
-    public static synchronized void loadNewSSTables(String ksName, String cfName)
+    public static synchronized void loadNewSSTables(String ksName, String cfName) 
     {
         /** ks/cf existence checks will be done by open and getCFS methods for us */
         Table table = Table.open(ksName);
@@ -565,7 +565,14 @@ public class ColumnFamilyStore implement
         data.addSSTables(sstables); // this will call updateCacheSizes() for us
 
         logger.info("Requesting a full secondary index re-build for " + table.name + "/" + columnFamily);
-        indexManager.maybeBuildSecondaryIndexes(sstables, indexManager.getIndexedColumns());
+        try
+        {
+            indexManager.maybeBuildSecondaryIndexes(sstables, indexManager.getIndexedColumns());
+        }
+        catch (IOException e)
+        {
+           throw new IOError(e);
+        }
 
         logger.info("Setting up new generation: " + generation);
         fileIndexGenerator.set(generation);
@@ -956,7 +963,7 @@ public class ColumnFamilyStore implement
         return invalid;
     }
 
-    public void removeAllSSTables()
+    public void removeAllSSTables() throws IOException
     {
         data.removeAllSSTables();
         indexManager.removeAllIndexes();

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/Table.java?rev=1169861&r1=1169860&r2=1169861&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/Table.java Mon Sep 12 18:41:31 2011
@@ -534,7 +534,14 @@ public class Table
             {
                 ColumnFamily cf = readCurrentIndexedColumns(key, cfs, indexedColumns);
                 if (cf != null)
-                    cfs.indexManager.applyIndexUpdates(key.key, cf, cf.getColumnNames(), null);
+                    try
+                    {
+                        cfs.indexManager.applyIndexUpdates(key.key, cf, cf.getColumnNames(), null);
+                    }
+                    catch (IOException e)
+                    {
+                        throw new IOError(e);
+                    }
             }
         }
         finally

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1169861&r1=1169860&r2=1169861&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Mon Sep 12 18:41:31 2011
@@ -45,13 +45,12 @@ import org.apache.cassandra.io.util.Rand
 import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterators;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * A singleton which manages a private executor of ongoing compactions. A readwrite lock
  * controls whether compactions can proceed: an external consumer can completely stop
@@ -701,6 +700,8 @@ public class CompactionManager implement
 
             SSTableScanner scanner = sstable.getDirectScanner();
             Collection<ByteBuffer> indexedColumns = cfs.indexManager.getIndexedColumns();
+            List<IColumn> indexedColumnsInRow = null;
+
             CleanupInfo ci = new CleanupInfo(sstable, scanner);
             executor.beginCompaction(ci);
             try
@@ -719,17 +720,30 @@ public class CompactionManager implement
                     }
                     else
                     {
+                                              
                         cfs.invalidateCachedRow(row.getKey());
+                                                
                         if (!indexedColumns.isEmpty() || isCommutative)
                         {
+                            if (indexedColumnsInRow != null)
+                                indexedColumnsInRow.clear();
+                            
                             while (row.hasNext())
                             {
                                 IColumn column = row.next();
                                 if (column instanceof CounterColumn)
                                     renewer.maybeRenew((CounterColumn) column);
                                 if (indexedColumns.contains(column.name()))
-                                    cfs.indexManager.deleteFromIndex(row.getKey().key, column);
+                                {
+                                    if (indexedColumnsInRow == null)
+                                        indexedColumnsInRow = new ArrayList<IColumn>();
+                                    
+                                    indexedColumnsInRow.add(column);
+                                }
                             }
+                            
+                            if (indexedColumnsInRow != null && !indexedColumnsInRow.isEmpty())
+                                cfs.indexManager.deleteFromIndexes(row.getKey(), indexedColumnsInRow);
                         }
                     }
                 }
@@ -758,19 +772,9 @@ public class CompactionManager implement
                 logger.info(String.format(format, writer.getFilename(), startsize, endsize, (int)(ratio*100), totalkeysWritten, dTime));
             }
 
-            // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
-            try
-            {
-                cfs.indexManager.flushIndexes();
-            }
-            catch (ExecutionException e)
-            {
-               throw new IOException(e);
-            }
-            catch (InterruptedException e)
-            {
-               throw new IOException(e);
-            }
+            // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd         
+            cfs.indexManager.flushIndexes();
+           
 
             cfs.replaceCompactedSSTables(Arrays.asList(sstable), results);
         }

Added: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java?rev=1169861&view=auto
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java (added)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java Mon Sep 12 18:41:31 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IColumn;
+
+/**
+ * Base class for Secondary indexes that implement a unique index per column
+ *
+ */
+public abstract class PerColumnSecondaryIndex extends SecondaryIndex
+{
+    /**
+     * Delete a column from the index
+     * 
+     * @param valueKey the column value which is used as the index key
+     * @param rowKey the underlying row key which is indexed
+     * @param col all the column info
+     */
+    public abstract void deleteColumn(DecoratedKey<?> valueKey, ByteBuffer rowKey, IColumn col) throws IOException;
+    
+    /**
+     * insert a column to the index
+     * 
+     * @param valueKey the column value which is used as the index key
+     * @param rowKey the underlying row key which is indexed
+     * @param col all the column info
+     */
+    public abstract void insertColumn(DecoratedKey<?> valueKey, ByteBuffer rowKey, IColumn col) throws IOException;
+    
+    /**
+     * update a column from the index
+     * 
+     * @param valueKey the column value which is used as the index key
+     * @param rowKey the underlying row key which is indexed
+     * @param col all the column info
+     */
+    public abstract void updateColumn(DecoratedKey<?> valueKey, ByteBuffer rowKey, IColumn col) throws IOException;
+    
+    @Override
+    public String getNameForSystemTable(ByteBuffer column)
+    {
+        return getIndexName();   
+    }
+}

Added: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java?rev=1169861&view=auto
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java (added)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java Mon Sep 12 18:41:31 2011
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.List;
+import java.util.SortedSet;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ *  Base class for Secondary indexes that implement a unique index per row
+ */
+public abstract class PerRowSecondaryIndex extends SecondaryIndex
+{
+    /**
+     * Removes obsolete index entries and creates new ones for the given row key
+     * and mutated columns.
+     * 
+     * @param rowKey the row key
+     * @param cf the current rows data
+     * @param mutatedIndexedColumns the set of columns that were changed or added
+     * @param oldIndexedColumns the columns which were deleted
+     * @throws IOException 
+     */
+    public abstract void applyIndexUpdates(ByteBuffer rowKey,
+                                           ColumnFamily cf,
+                                           SortedSet<ByteBuffer> mutatedIndexedColumns,
+                                           ColumnFamily oldIndexedColumns) throws IOException;
+    
+    
+    /**
+     * cleans up deleted columns from cassandra cleanup compaction
+     * 
+     * @param key
+     * @param indexedColumnsInRow
+     */
+    public abstract void deleteFromIndex(DecoratedKey<?> key, List<IColumn> indexedColumnsInRow);
+   
+    
+    @Override
+    public String getNameForSystemTable(ByteBuffer columnName)
+    {
+        try
+        {
+            return getIndexName()+ByteBufferUtil.string(columnName);
+        } 
+        catch (CharacterCodingException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndex.java?rev=1169861&r1=1169860&r2=1169861&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndex.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndex.java Mon Sep 12 18:41:31 2011
@@ -20,19 +20,17 @@ package org.apache.cassandra.db.index;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
+import java.util.*;
+import java.util.concurrent.*;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.index.keys.KeysIndex;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +38,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Abstract base class for different types of secondary indexes.
  * 
- * This abstraction assumes there is one column per index instance
+ * Do not extend this directly, please pick from PerColumnSecondaryIndex or PerRowSecondaryIndex
  */
 public abstract class SecondaryIndex
 {
@@ -56,9 +54,9 @@ public abstract class SecondaryIndex
     
     
     /**
-     * The column definition which this index is responsible for
+     * The column definitions which this index is responsible for
      */
-    protected ColumnDefinition columnDef;
+    protected Set<ColumnDefinition> columnDefs = Collections.newSetFromMap(new ConcurrentHashMap<ColumnDefinition,Boolean>());
     
     /**
      * Perform any initialization work
@@ -77,44 +75,28 @@ public abstract class SecondaryIndex
      */
     abstract public String getIndexName();
    
-        
-    /**
-     * Delete a column from the index
-     * 
-     * @param valueKey the column value which is used as the index key
-     * @param rowKey the underlying row key which is indexed
-     * @param col all the column info
-     */
-    public abstract void deleteColumn(DecoratedKey<?> valueKey, ByteBuffer rowKey, IColumn col);
     
     /**
-     * insert a column to the index
+     * Return the unique name for this index and column
+     * to be stored in the SystemTable that tracks if each column is built
      * 
-     * @param valueKey the column value which is used as the index key
-     * @param rowKey the underlying row key which is indexed
-     * @param col all the column info
+     * @param columnName the name of the column
+     * @return the unique name
      */
-    public abstract void insertColumn(DecoratedKey<?> valueKey, ByteBuffer rowKey, IColumn col);
-    
+    abstract public String getNameForSystemTable(ByteBuffer columnName);
+      
     /**
-     * update a column from the index
+     * Checks if the index for specified column is fully built
      * 
-     * @param valueKey the column value which is used as the index key
-     * @param rowKey the underlying row key which is indexed
-     * @param col all the column info
+     * @param columnName the column
+     * @return true if the index is fully built
      */
-    public abstract void updateColumn(DecoratedKey<?> valueKey, ByteBuffer rowKey, IColumn col);
-    
+    public boolean isIndexBuilt(ByteBuffer columnName)
+    {
+        return SystemTable.isIndexBuilt(baseCfs.table.name, getNameForSystemTable(columnName));
+    }
     
     /**
-     * Called after a number of insert/delete calls
-     * this is required for indexes that keep many columns per row
-     * depends on the underlying impl
-     * @param rowKey the row identifier that was completed
-     */
-    public abstract void commitRow(ByteBuffer rowKey);
-      
-    /**
      * Called at query time
      * Creates a implementation specific searcher instance for this index type
      * @param columns the list of columns which belong to this index type
@@ -124,31 +106,22 @@ public abstract class SecondaryIndex
         
     /**
      * Forces this indexes in memory data to disk
-     * @throws ExecutionException
-     * @throws InterruptedException
+     * @throws IOException
      */
-    public abstract void forceBlockingFlush() throws ExecutionException, InterruptedException;
+    public abstract void forceBlockingFlush() throws IOException;
 
     /**
      * Allow access to the underlying column family store if there is one
      * @return the underlying column family store or null
      */
     public abstract ColumnFamilyStore getUnderlyingCfs();
-    
-    
-    /**
-     * Check if index is already built for current store
-     * @return true if built, false otherwise
-     */
-    public boolean isIndexBuilt()
-    {
-        return SystemTable.isIndexBuilt(baseCfs.table.name, getIndexName());
-    }
+   
     
     /**
      * Delete all files and references to this index
+     * @param columnName the indexed column to remove
      */
-    public abstract void removeIndex();
+    public abstract void removeIndex(ByteBuffer columnName) throws IOException;
     
     /**
      * Renames the underlying index files to reflect the new CF name
@@ -167,13 +140,18 @@ public abstract class SecondaryIndex
      * Builds the index using the data in the underlying CFS
      * Blocks till it's complete
      */
-    protected void buildIndexBlocking()
+    protected void buildIndexBlocking() throws IOException
     {
         logger.info(String.format("Submitting index build of %s for data in %s",
-                baseCfs.metadata.comparator.getString(columnDef.name), StringUtils.join(baseCfs.getSSTables(), ", ")));
+                getIndexName(), StringUtils.join(baseCfs.getSSTables(), ", ")));
 
+        SortedSet<ByteBuffer> columnNames = new TreeSet<ByteBuffer>();
+        
+        for (ColumnDefinition cdef : columnDefs)
+            columnNames.add(cdef.name);
+        
         SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
-                                                                  new TreeSet<ByteBuffer>(Collections.singleton(columnDef.name)),
+                                                                  columnNames,
                                                                   new ReducingKeyIterator(baseCfs.getSSTables()));
 
         Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
@@ -181,7 +159,17 @@ public abstract class SecondaryIndex
         {
             future.get();
             forceBlockingFlush();
-            SystemTable.setIndexBuilt(baseCfs.table.name, getIndexName());
+            
+            // Mark all indexed columns as built
+            if (this instanceof PerRowSecondaryIndex)
+            {
+                for (ByteBuffer columnName : columnNames)
+                    SystemTable.setIndexBuilt(baseCfs.table.name, getIndexName()+ByteBufferUtil.string(columnName));
+            }
+            else
+            {
+                SystemTable.setIndexBuilt(baseCfs.table.name, getIndexName());
+            }
         }
         catch (InterruptedException e)
         {
@@ -189,22 +177,34 @@ public abstract class SecondaryIndex
         }
         catch (ExecutionException e)
         {
-            throw new RuntimeException(e);
+            throw new IOException(e);
         }
-        logger.info("Index build of " + baseCfs.metadata.comparator.getString(columnDef.name) + " complete");
+        logger.info("Index build of " + getIndexName() + " complete");
     }
 
     
     /**
      * Builds the index using the data in the underlying CF, non blocking
+     * 
+     * 
      * @return A future object which the caller can block on (optional)
      */
     public Future<?> buildIndexAsync()
     {
         // if we're just linking in the index to indexedColumns on an already-built index post-restart, we're done
-        if (isIndexBuilt())
+        boolean allAreBuilt = true;
+        for (ColumnDefinition cdef : columnDefs)
+        {
+            if (!SystemTable.isIndexBuilt(baseCfs.table.name, getNameForSystemTable(cdef.name)))
+            {
+                allAreBuilt = false;
+                break;
+            }
+        }
+        
+        if (allAreBuilt)
             return null;
-
+        
         // build it asynchronously; addIndex gets called by CFS open and schema update, neither of which
         // we want to block for a long period.  (actual build is serialized on CompactionManager.)
         Runnable runnable = new Runnable()
@@ -224,33 +224,50 @@ public abstract class SecondaryIndex
                     throw new AssertionError(e);
                 }
                 
-                buildIndexBlocking();
+                try
+                {
+                    buildIndexBlocking();
+                } 
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
             }
         };
         FutureTask<?> f = new FutureTask<Object>(runnable, null);
        
-        new Thread(f, "Creating index: " + columnDef.getIndexName()).start();
+        new Thread(f, "Creating index: " + getIndexName()).start();
         return f;
     }
     
-    public ColumnFamilyStore getBaseCfs()
+    public ColumnFamilyStore getBaseCFStore()
     {
         return baseCfs;
     }
 
-    private void setBaseCfs(ColumnFamilyStore baseCfs)
+    private void setBaseCfStore(ColumnFamilyStore baseCfs)
     {
         this.baseCfs = baseCfs;
     }
 
-    public ColumnDefinition getColumnDef()
+    Set<ColumnDefinition> getColumnDefs()
     {
-        return columnDef;
+        return columnDefs;
     }
 
-    private void setColumnDef(ColumnDefinition columnDef)
+    void addColumnDef(ColumnDefinition columnDef)
     {
-        this.columnDef = columnDef;
+       columnDefs.add(columnDef);
+    }
+    
+    void removeColumnDef(ByteBuffer name)
+    {
+        Iterator<ColumnDefinition> it = columnDefs.iterator();
+        while (it.hasNext())
+        {
+            if (it.next().name.equals(name))
+                it.remove();
+        }
     }
     
     /**
@@ -259,13 +276,13 @@ public abstract class SecondaryIndex
      * 
      * @param baseCfs the source of data for the Index
      * @param cdef the meta information about this column (index_type, index_options, name, etc...)
-     * @param init specify if this index should be initialized after allocated
+     *
      * @return The secondary index instance for this column
      * @throws ConfigurationException
      */
-    public static SecondaryIndex createInstance(ColumnFamilyStore baseCfs, ColumnDefinition cdef, boolean init) throws ConfigurationException
+    public static SecondaryIndex createInstance(ColumnFamilyStore baseCfs, ColumnDefinition cdef) throws ConfigurationException
     {
-        SecondaryIndex index = null;
+        SecondaryIndex index;
         
         switch (cdef.getIndexType())
         {
@@ -287,14 +304,11 @@ public abstract class SecondaryIndex
             break;
             default:
                 throw new RuntimeException("Unknown index type: " + cdef.getIndexName());
-        };
+        }
         
-        index.setColumnDef(cdef);
+        index.addColumnDef(cdef);
         index.validateOptions();
-        index.setBaseCfs(baseCfs);
-        
-        if (init)
-            index.init();
+        index.setBaseCfStore(baseCfs);
         
         return index;
     }

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java?rev=1169861&r1=1169860&r2=1169861&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java Mon Sep 12 18:41:31 2011
@@ -18,29 +18,22 @@
 package org.apache.cassandra.db.index;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.IFilter;
-import org.apache.cassandra.db.index.keys.KeysIndex;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.LocalToken;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.thrift.IndexExpression;
-import org.apache.cassandra.thrift.IndexType;
 import org.apache.commons.lang.StringUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,9 +46,19 @@ public class SecondaryIndexManager
     private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
 
     /**
-     * Organized the indexes by column name
+     * Organizes the indexes by column name
      */
     private final ConcurrentNavigableMap<ByteBuffer, SecondaryIndex> indexesByColumn;
+    
+    
+    /**
+     * Keeps a single instance of a SecondaryIndex for many columns when the index type
+     * has isRowLevelIndex() == true
+     * 
+     * This allows updates to happen to an entire row at once
+     */
+    private final Map<Class<? extends SecondaryIndex>,SecondaryIndex> rowLevelIndexMap;
+    
 
     /**
      * The underlying column family containing the source data for these indexes
@@ -65,14 +68,16 @@ public class SecondaryIndexManager
     public SecondaryIndexManager(ColumnFamilyStore baseCfs)
     {
         indexesByColumn = new ConcurrentSkipListMap<ByteBuffer, SecondaryIndex>();
-
+        rowLevelIndexMap = new HashMap<Class<? extends SecondaryIndex>, SecondaryIndex>();
+        
         this.baseCfs = baseCfs;
     }
 
     /**
      * Drops and adds new indexes associated with the underlying CF
+     * @throws IOException 
      */
-    public void reload()
+    public void reload() throws IOException
     {
         // figure out what needs to be added and dropped.
         // future: if/when we have modifiable settings for secondary indexes,
@@ -96,8 +101,9 @@ public class SecondaryIndexManager
      * Does nothing if columns is empty.
      * @param sstables the data to build from
      * @param columns the list of columns to index
+     * @throws IOException 
      */
-    public void maybeBuildSecondaryIndexes(Collection<SSTableReader> sstables, SortedSet<ByteBuffer> columns)
+    public void maybeBuildSecondaryIndexes(Collection<SSTableReader> sstables, SortedSet<ByteBuffer> columns) throws IOException
     {
         if (columns.isEmpty())
             return;
@@ -135,17 +141,29 @@ public class SecondaryIndexManager
     /**
      * Removes a existing index
      * @param column the indexed column to remove
+     * @throws IOException 
      */
-    public void removeIndexedColumn(ByteBuffer column)
+    public void removeIndexedColumn(ByteBuffer column) throws IOException
     {
         SecondaryIndex index = indexesByColumn.remove(column);
         
-        if(index == null)
+        if (index == null)
             return;
-               
-        SystemTable.setIndexRemoved(baseCfs.metadata.ksName, index.getIndexName());              
         
-        index.removeIndex();      
+        SystemTable.setIndexRemoved(baseCfs.metadata.ksName, index.getNameForSystemTable(column));
+        
+        index.removeIndex(column);
+        
+        // Remove this column from from row level index map
+        if (index instanceof PerRowSecondaryIndex)
+        {
+            index.removeColumnDef(column);
+            
+            //If now columns left on this CF remove from row level lookup
+            if (index.getColumnDefs().isEmpty())
+                rowLevelIndexMap.remove(index.getClass());
+        }
+        
     }
 
     /**
@@ -165,24 +183,44 @@ public class SecondaryIndexManager
         SecondaryIndex index;
         try
         {
-            index = SecondaryIndex.createInstance(baseCfs, cdef, true);
+            index = SecondaryIndex.createInstance(baseCfs, cdef);
         } catch (ConfigurationException e)
         {
             throw new RuntimeException(e);
-        }               
-
+        }      
+        
+        // Keep a single instance of the index per-cf for row level indexes
+        // since we want all columns to be under the index
+        if (index instanceof PerRowSecondaryIndex)
+        {
+            SecondaryIndex currentIndex = rowLevelIndexMap.get(index.getClass());
+            
+            if (currentIndex == null)
+            {
+                rowLevelIndexMap.put(index.getClass(), index);
+                index.init();
+            }
+            else
+            {
+                index = currentIndex;
+                index.addColumnDef(cdef);
+            }         
+        }
+        else
+        {
+            index.init();
+        }
+             
         // link in indexedColumns. this means that writes will add new data to
         // the index immediately,
         // so we don't have to lock everything while we do the build. it's up to
         // the operator to wait
         // until the index is actually built before using in queries.
-        if (indexesByColumn.putIfAbsent(cdef.name, index) != null)
-            return null;       
-        
+        indexesByColumn.put(cdef.name, index);
         
         // if we're just linking in the index to indexedColumns on an
         // already-built index post-restart, we're done
-        if (index.isIndexBuilt())
+        if (index.isIndexBuilt(cdef.name))
             return null;
 
         return index.buildIndexAsync();
@@ -203,17 +241,18 @@ public class SecondaryIndexManager
      */
     public void unregisterMBeans()
     {
-        for(Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
+        for (Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
             entry.getValue().unregisterMbean();
     }
     
     /**
      * Remove all underlying index data
+     * @throws IOException 
      */
-    public void removeAllIndexes()
+    public void removeAllIndexes() throws IOException
     {
-        for(Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
-            entry.getValue().removeIndex();
+        for (Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
+            entry.getValue().removeIndex(entry.getKey());
     }
     
     /**
@@ -222,7 +261,7 @@ public class SecondaryIndexManager
      */
     public void renameIndexes(String newCfName) throws IOException
     {
-        for(Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
+        for (Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
             entry.getValue().renameIndex(newCfName);
     }
     
@@ -231,9 +270,9 @@ public class SecondaryIndexManager
      * @throws ExecutionException
      * @throws InterruptedException
      */
-    public void flushIndexes() throws ExecutionException, InterruptedException
+    public void flushIndexes() throws IOException
     {
-        for(Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
+        for (Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
             entry.getValue().forceBlockingFlush();
     }
     
@@ -248,22 +287,6 @@ public class SecondaryIndexManager
         return new DecoratedKey<LocalToken>(new LocalToken(baseCfs.metadata.getColumnDefinition(name).getValidator(), value), value);
     }
     
-    /**
-     * Deletes data from one of the indexes
-     * @param rowKey the row identifier to delete
-     * @param column the column data to delete
-     */
-    public void deleteFromIndex(ByteBuffer rowKey, IColumn column)
-    {
-       SecondaryIndex index = indexesByColumn.get(column.name());
-       
-       if(index == null)
-           return;
-            
-       DecoratedKey<LocalToken> valueKey = getIndexKeyFor(column.name(), column.value());
-
-       index.deleteColumn(valueKey, rowKey, column);
-    }
     
     /**
      * @return all built indexes (ready to use)
@@ -272,9 +295,11 @@ public class SecondaryIndexManager
     {
         List<String> indexList = new ArrayList<String>();
         
-        for(Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
+        for (Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
         {
-            if(entry.getValue().isIndexBuilt())
+            SecondaryIndex index = entry.getValue();
+            
+            if (index.isIndexBuilt(entry.getKey())) 
             {
                 indexList.add(entry.getValue().getIndexName());
             }
@@ -290,31 +315,41 @@ public class SecondaryIndexManager
     {
         ArrayList<ColumnFamilyStore> cfsList = new ArrayList<ColumnFamilyStore>();
 
-        for(Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
+        for (Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
         {
             ColumnFamilyStore cfs = entry.getValue().getUnderlyingCfs();
             
-            if(cfs != null)
+            if (cfs != null)
                 cfsList.add(cfs);        
         }
         
         return cfsList;
     }
         
+   
     /**
-     * removes obsolete index entries and creates new ones for the given row key
+     * Removes obsolete index entries and creates new ones for the given row key
      * and mutated columns.
      * 
-     * @return list of full (index CF) memtables
+     * For columns whos underlying index type has the isRowLevelIndex() flag set to true this function will
+     * call the 
+     * 
+     * @param rowKey the row key
+     * @param cf the current rows data
+     * @param mutatedIndexedColumns the set of columns that were changed or added
+     * @param oldIndexedColumns the columns what were deleted
+     * @throws IOException 
      */
     public void applyIndexUpdates(ByteBuffer rowKey,
-                                                 ColumnFamily cf,
-                                                 SortedSet<ByteBuffer> mutatedIndexedColumns,
-                                                 ColumnFamily oldIndexedColumns)
+                                  ColumnFamily cf,
+                                  SortedSet<ByteBuffer> mutatedIndexedColumns,
+                                  ColumnFamily oldIndexedColumns) throws IOException
     {
-        //Track the indexes we touch so we can commit the row across them
-        Set<SecondaryIndex> indexesTouched = new HashSet<SecondaryIndex>(indexesByColumn.size());
-
+        
+        // Identify the columns with PerRowSecondaryIndexes
+        // we need to make sure this is only called once
+        Set<Class<? extends SecondaryIndex>> appliedRowLevelIndexes = null;       
+      
         // remove the old index entries
         if (oldIndexedColumns != null)
         {
@@ -328,17 +363,27 @@ public class SecondaryIndexManager
                 
                 //this was previously deleted so should not be in index
                 if (column.isMarkedForDelete())
-                    continue;
-           
+                    continue;              
+                
                 SecondaryIndex index = getIndexForColumn(columnName);
-                assert index != null;
+                assert index != null;               
 
-                indexesTouched.add(index);
+                //Update entire row if we encounter a row level index
+                if (index instanceof PerRowSecondaryIndex)
+                {
+                    if (appliedRowLevelIndexes == null)
+                        appliedRowLevelIndexes = new HashSet<Class<? extends SecondaryIndex>>();
+                    else
+                        if (appliedRowLevelIndexes.add(index.getClass()))
+                            ((PerRowSecondaryIndex)index).applyIndexUpdates(rowKey, cf, mutatedIndexedColumns, oldIndexedColumns);
+                }
+                else
+                {
+                    DecoratedKey<LocalToken> valueKey = getIndexKeyFor(columnName, column.value());
 
-                DecoratedKey<LocalToken> valueKey = getIndexKeyFor(columnName, column.value());
-
-                index.deleteColumn(valueKey, rowKey, column);
-            }           
+                    ((PerColumnSecondaryIndex)index).deleteColumn(valueKey, rowKey, column);
+                }
+            }
         }
         
         //insert new columns
@@ -351,20 +396,62 @@ public class SecondaryIndexManager
             SecondaryIndex index = getIndexForColumn(columnName);
             assert index != null;
 
-            indexesTouched.add(index);
-
-            DecoratedKey<LocalToken> valueKey = getIndexKeyFor(columnName, column.value());
-                        
-            index.insertColumn(valueKey, rowKey, column);         
+            //Update entire row if we encounter a row level index
+            if (index instanceof PerRowSecondaryIndex)
+            {
+                if (appliedRowLevelIndexes == null)
+                    appliedRowLevelIndexes = new HashSet<Class<? extends SecondaryIndex>>();
+                else
+                    if (appliedRowLevelIndexes.add(index.getClass()))
+                        ((PerRowSecondaryIndex)index).applyIndexUpdates(rowKey, cf, mutatedIndexedColumns, oldIndexedColumns);
+            }
+            else
+            {
+                DecoratedKey<LocalToken> valueKey = getIndexKeyFor(columnName, column.value());
+                
+                ((PerColumnSecondaryIndex)index).insertColumn(valueKey, rowKey, column);         
+            }
         }
+    }
+    
+    /**
+     * Delete all columns from all indexes for this row
+     * @param key the row key
+     * @param indexedColumnsInRow all column names in row
+     */
+    public void deleteFromIndexes(DecoratedKey<?> key, List<IColumn> indexedColumnsInRow) throws IOException
+    {
         
-        //Commit the row across all indexes
-        for (SecondaryIndex index : indexesTouched)
+        // Identify the columns with isRowLevelIndex == true 
+        // we need to make sure this is only called once
+        Set<Class<? extends SecondaryIndex>> cleanedRowLevelIndexes = null;       
+        
+        for (IColumn column : indexedColumnsInRow)
         {
-            index.commitRow(rowKey);
-        }
+            SecondaryIndex index = indexesByColumn.get(column.name());
+            
+            if (index == null)
+                continue;
+            
+            //Update entire row if we encounter a row level index
+            if (index instanceof PerRowSecondaryIndex)
+            {
+                if (cleanedRowLevelIndexes == null)
+                    cleanedRowLevelIndexes = new HashSet<Class<? extends SecondaryIndex>>();
+                else
+                    if (cleanedRowLevelIndexes.add(index.getClass()))
+                        ((PerRowSecondaryIndex)index).deleteFromIndex(key, indexedColumnsInRow);             
+            }
+            else
+            {
+                DecoratedKey<LocalToken> valueKey = getIndexKeyFor(column.name(), column.value());
+
+                ((PerColumnSecondaryIndex)index).deleteColumn(valueKey, key.key, column);
+            }
+        }       
     }
-     
+    
+    
     /**
      * Get a list of IndexSearchers from the union of expression index types
      * @param clause the query clause
@@ -378,11 +465,11 @@ public class SecondaryIndexManager
  
         
         //Group columns by type
-        for(IndexExpression ix : clause.expressions)
+        for (IndexExpression ix : clause.expressions)
         {
             SecondaryIndex index = getIndexForColumn(ix.column_name);
             
-            if(index == null)
+            if (index == null)
                 continue;
             
             Set<ByteBuffer> columns = groupByIndexType.get(index.getClass().getCanonicalName());
@@ -418,16 +505,14 @@ public class SecondaryIndexManager
     {
         List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
                
-        if(indexSearchers.isEmpty())
+        if (indexSearchers.isEmpty())
             return Collections.emptyList();
        
         //We currently don't support searching across multiple index types
-        if(indexSearchers.size() > 1)
+        if (indexSearchers.size() > 1)
             throw new RuntimeException("Unable to search across multiple secondary index types");
         
         
         return indexSearchers.get(0).search(clause, range, dataFilter);
     }
-        
-    
 }

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java?rev=1169861&r1=1169860&r2=1169861&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java Mon Sep 12 18:41:31 2011
@@ -18,15 +18,9 @@
 package org.apache.cassandra.db.index;
 
 import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
 
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.dht.AbstractBounds;

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java?rev=1169861&r1=1169860&r2=1169861&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java Mon Sep 12 18:41:31 2011
@@ -20,32 +20,20 @@ package org.apache.cassandra.db.index.ke
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Iterator;
 import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.db.Column;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.ExpiringColumn;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.Memtable;
-import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.PerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.LocalByPartionerType;
-import org.apache.cassandra.dht.ByteOrderedPartitioner;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.LocalPartitioner;
-import org.apache.cassandra.dht.OrderPreservingPartitioner;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.IndexType;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,18 +42,21 @@ import org.slf4j.LoggerFactory;
  * Implements a secondary index for a column family using a second column family
  * in which the row keys are indexed values, and column names are base row keys.
  */
-public class KeysIndex extends SecondaryIndex
+public class KeysIndex extends PerColumnSecondaryIndex
 {
     private static final Logger logger = LoggerFactory.getLogger(KeysIndex.class);
     private ColumnFamilyStore indexedCfs;
-
+    private ColumnDefinition  columnDef;
+    
     public KeysIndex() 
     {
     }
     
     public void init()
     {
-        assert baseCfs != null && columnDef != null;
+        assert baseCfs != null && columnDefs != null;
+        
+        columnDef = columnDefs.iterator().next();
         
         CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef, indexComparator());
         indexedCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.table,
@@ -122,22 +113,27 @@ public class KeysIndex extends Secondary
     }
 
     @Override
-    public void commitRow(ByteBuffer rowKey)
-    {
-       //nothing required in this impl since indexes are per column 
-    }
-
-    @Override
-    public void removeIndex()
-    {
+    public void removeIndex(ByteBuffer columnName) throws IOException
+    {        
         indexedCfs.removeAllSSTables();
-        indexedCfs.unregisterMBean();
+        indexedCfs.unregisterMBean();     
     }
 
     @Override
-    public void forceBlockingFlush() throws ExecutionException, InterruptedException
-    {
-        indexedCfs.forceBlockingFlush();
+    public void forceBlockingFlush() throws IOException
+    {       
+        try
+        {
+            indexedCfs.forceBlockingFlush();
+        } 
+        catch (ExecutionException e)
+        {
+            throw new IOException(e);
+        } 
+        catch (InterruptedException e)
+        {
+            throw new IOException(e);
+        }
     }
 
     @Override

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java?rev=1169861&r1=1169860&r2=1169861&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java Mon Sep 12 18:41:31 2011
@@ -18,24 +18,12 @@
 package org.apache.cassandra.db.index.keys;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
-import org.apache.cassandra.db.filter.IFilter;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
@@ -45,7 +33,6 @@ import org.apache.cassandra.thrift.Index
 import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.HeapAllocator;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=1169861&r1=1169860&r2=1169861&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java Mon Sep 12 18:41:31 2011
@@ -649,7 +649,7 @@ public class ThriftValidation
                     ColumnDefinition cdef = ColumnDefinition.fromThrift(c);
                    
                     // This method validates the column metadata but does not intialize the index
-                    SecondaryIndex.createInstance(null, cdef, false);
+                    SecondaryIndex.createInstance(null, cdef);
                 }
             }
             validateMinMaxCompactionThresholds(cf_def);

Modified: cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/db/CleanupTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/db/CleanupTest.java?rev=1169861&r1=1169860&r2=1169861&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/db/CleanupTest.java (original)
+++ cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/db/CleanupTest.java Mon Sep 12 18:41:31 2011
@@ -109,7 +109,7 @@ public class CleanupTest extends Cleanup
         assertEquals(LOOPS, rows.size());
 
         SecondaryIndex index = cfs.indexManager.getIndexForColumn(COLUMN);
-        assertTrue(index.isIndexBuilt());
+        assertTrue(index.isIndexBuilt(COLUMN));
 
         // verify we get it back w/ index query too
         IndexExpression expr = new IndexExpression(COLUMN, IndexOperator.EQ, VALUE);

Modified: cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=1169861&r1=1169860&r2=1169861&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Mon Sep 12 18:41:31 2011
@@ -467,7 +467,7 @@ public class ColumnFamilyStoreTest exten
         // validate that drop clears it out & rebuild works (CASSANDRA-2320)
         SecondaryIndex indexedCfs = cfs.indexManager.getIndexForColumn(ByteBufferUtil.bytes("birthdate"));
         cfs.indexManager.removeIndexedColumn(ByteBufferUtil.bytes("birthdate"));
-        assert !indexedCfs.isIndexBuilt();
+        assert !indexedCfs.isIndexBuilt(ByteBufferUtil.bytes("birthdate"));
 
         // rebuild & re-query
         future = cfs.indexManager.addIndexedColumn(cd);



Mime
View raw message