cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [04/15] cassandra git commit: New 2i API and implementations for built in indexes
Date Fri, 21 Aug 2015 23:48:35 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/rows/Cells.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java b/src/java/org/apache/cassandra/db/rows/Cells.java
index 080d640..54df26e 100644
--- a/src/java/org/apache/cassandra/db/rows/Cells.java
+++ b/src/java/org/apache/cassandra/db/rows/Cells.java
@@ -22,9 +22,9 @@ import java.util.Comparator;
 import java.util.Iterator;
 
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Conflicts;
+import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
 
 /**
  * Static methods to work on cells.
@@ -58,9 +58,6 @@ public abstract class Cells
      * Also note that which cell is provided as {@code existing} and which is
      * provided as {@code update} matters for index updates.
      *
-     * @param clustering the clustering for the row the cells to merge originate from.
-     * This is only used for index updates, so this can be {@code null} if
-     * {@code indexUpdater == SecondaryIndexManager.nullUpdater}.
      * @param existing the pre-existing cell, the one that is updated. This can be
      * {@code null} if this reconciliation correspond to an insertion.
      * @param update the newly added cell, the update. This can be {@code null} out
@@ -72,20 +69,15 @@ public abstract class Cells
      * @param nowInSec the current time in seconds (which plays a role during reconciliation
      * because deleted cells always have precedence on timestamp equality and deciding if a
      * cell is a live or not depends on the current time due to expiring cells).
-     * @param indexUpdater an index updater to which the result of the reconciliation is
-     * signaled (if relevant, that is if the update is not simply ignored by the reconciliation).
-     * This cannot be {@code null} but {@code SecondaryIndexManager.nullUpdater} can be passed.
      *
      * @return the timestamp delta between existing and update, or {@code Long.MAX_VALUE} if one
      * of them is {@code null} or deleted by {@code deletion}).
      */
-    public static long reconcile(Clustering clustering,
-                                 Cell existing,
+    public static long reconcile(Cell existing,
                                  Cell update,
                                  DeletionTime deletion,
                                  Row.Builder builder,
-                                 int nowInSec,
-                                 SecondaryIndexManager.Updater indexUpdater)
+                                 int nowInSec)
     {
         existing = existing == null || deletion.deletes(existing) ? null : existing;
         update = update == null || deletion.deletes(update) ? null : update;
@@ -93,10 +85,6 @@ public abstract class Cells
         {
             if (update != null)
             {
-                // It's inefficient that we call maybeIndex (which is for primary key indexes) on every cell, but
-                // we'll need to fix that damn 2ndary index API to avoid that.
-                updatePKIndexes(clustering, update, nowInSec, indexUpdater);
-                indexUpdater.insert(clustering, update);
                 builder.addCell(update);
             }
             else if (existing != null)
@@ -109,21 +97,9 @@ public abstract class Cells
         Cell reconciled = reconcile(existing, update, nowInSec);
         builder.addCell(reconciled);
 
-        // Note that this test rely on reconcile returning either 'existing' or 'update'. That's not true for counters but we don't index them
-        if (reconciled == update)
-        {
-            updatePKIndexes(clustering, update, nowInSec, indexUpdater);
-            indexUpdater.update(clustering, existing, reconciled);
-        }
         return Math.abs(existing.timestamp() - update.timestamp());
     }
 
-    private static void updatePKIndexes(Clustering clustering, Cell cell, int nowInSec, SecondaryIndexManager.Updater indexUpdater)
-    {
-        if (indexUpdater != SecondaryIndexManager.nullUpdater && cell.isLive(nowInSec))
-            indexUpdater.maybeIndex(clustering, cell.timestamp(), cell.ttl(), DeletionTime.LIVE);
-    }
-
     /**
      * Reconciles/merge two cells.
      * <p>
@@ -202,9 +178,6 @@ public abstract class Cells
      * Also note that which cells is provided as {@code existing} and which are
      * provided as {@code update} matters for index updates.
      *
-     * @param clustering the clustering for the row the cells to merge originate from.
-     * This is only used for index updates, so this can be {@code null} if
-     * {@code indexUpdater == SecondaryIndexManager.nullUpdater}.
      * @param column the complex column the cells are for.
      * @param existing the pre-existing cells, the ones that are updated. This can be
      * {@code null} if this reconciliation correspond to an insertion.
@@ -217,9 +190,6 @@ public abstract class Cells
      * @param nowInSec the current time in seconds (which plays a role during reconciliation
      * because deleted cells always have precedence on timestamp equality and deciding if a
      * cell is a live or not depends on the current time due to expiring cells).
-     * @param indexUpdater an index updater to which the result of the reconciliation is
-     * signaled (if relevant, that is if the updates are not simply ignored by the reconciliation).
-     * This cannot be {@code null} but {@code SecondaryIndexManager.nullUpdater} can be passed.
      *
      * @return the smallest timestamp delta between corresponding cells from existing and update. A
      * timestamp delta being computed as the difference between a cell from {@code update} and the
@@ -227,14 +197,12 @@ public abstract class Cells
      * of cells from {@code existing} and {@code update} having the same cell path is empty, this
      * returns {@code Long.MAX_VALUE}.
      */
-    public static long reconcileComplex(Clustering clustering,
-                                        ColumnDefinition column,
+    public static long reconcileComplex(ColumnDefinition column,
                                         Iterator<Cell> existing,
                                         Iterator<Cell> update,
                                         DeletionTime deletion,
                                         Row.Builder builder,
-                                        int nowInSec,
-                                        SecondaryIndexManager.Updater indexUpdater)
+                                        int nowInSec)
     {
         Comparator<CellPath> comparator = column.cellPathComparator();
         Cell nextExisting = getNext(existing);
@@ -247,17 +215,17 @@ public abstract class Cells
                      : comparator.compare(nextExisting.path(), nextUpdate.path()));
             if (cmp < 0)
             {
-                reconcile(clustering, nextExisting, null, deletion, builder, nowInSec, indexUpdater);
+                reconcile(nextExisting, null, deletion, builder, nowInSec);
                 nextExisting = getNext(existing);
             }
             else if (cmp > 0)
             {
-                reconcile(clustering, null, nextUpdate, deletion, builder, nowInSec, indexUpdater);
+                reconcile(null, nextUpdate, deletion, builder, nowInSec);
                 nextUpdate = getNext(update);
             }
             else
             {
-                timeDelta = Math.min(timeDelta, reconcile(clustering, nextExisting, nextUpdate, deletion, builder, nowInSec, indexUpdater));
+                timeDelta = Math.min(timeDelta, reconcile(nextExisting, nextUpdate, deletion, builder, nowInSec));
                 nextExisting = getNext(existing);
                 nextUpdate = getNext(update);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index 0b739a8..ce177f2 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -17,7 +17,9 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
@@ -25,7 +27,6 @@ import com.google.common.collect.PeekingIterator;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.utils.SearchIterator;
 
 /**
@@ -127,7 +128,7 @@ public abstract class Rows
      * @param diffListener the listener to which to signal the differences between the inputs and the merged
      * result.
      */
-    public static void diff(Row merged, Columns columns, Row[] inputs, RowDiffListener diffListener)
+    public static void diff(RowDiffListener diffListener, Row merged, Columns columns, Row...inputs)
     {
         Clustering clustering = merged.clustering();
         LivenessInfo mergedInfo = merged.primaryKeyLivenessInfo().isEmpty() ? null : merged.primaryKeyLivenessInfo();
@@ -218,23 +219,17 @@ public abstract class Rows
     {
         Columns mergedColumns = row1.columns().mergeTo(row2.columns());
         Row.Builder builder = BTreeRow.sortedBuilder(mergedColumns);
-        merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater);
+        merge(row1, row2, mergedColumns, builder, nowInSec);
         return builder.build();
     }
 
-    public static void merge(Row row1, Row row2, Columns mergedColumns, Row.Builder builder, int nowInSec)
-    {
-        merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater);
-    }
-
     // Merge rows in memtable
     // Return the minimum timestamp delta between existing and update
     public static long merge(Row existing,
                              Row update,
                              Columns mergedColumns,
                              Row.Builder builder,
-                             int nowInSec,
-                             SecondaryIndexManager.Updater indexUpdater)
+                             int nowInSec)
     {
         Clustering clustering = existing.clustering();
         builder.newRow(clustering);
@@ -253,20 +248,16 @@ public abstract class Rows
         builder.addPrimaryKeyLivenessInfo(mergedInfo);
         builder.addRowDeletion(deletion);
 
-        indexUpdater.maybeIndex(clustering, mergedInfo.timestamp(), mergedInfo.ttl(), deletion);
-
         for (int i = 0; i < mergedColumns.simpleColumnCount(); i++)
         {
             ColumnDefinition c = mergedColumns.getSimple(i);
             Cell existingCell = existing.getCell(c);
             Cell updateCell = update.getCell(c);
-            timeDelta = Math.min(timeDelta, Cells.reconcile(clustering,
-                                                            existingCell,
+            timeDelta = Math.min(timeDelta, Cells.reconcile(existingCell,
                                                             updateCell,
                                                             deletion,
                                                             builder,
-                                                            nowInSec,
-                                                            indexUpdater));
+                                                            nowInSec));
         }
 
         for (int i = 0; i < mergedColumns.complexColumnCount(); i++)
@@ -285,7 +276,7 @@ public abstract class Rows
 
             Iterator<Cell> existingCells = existingData == null ? null : existingData.iterator();
             Iterator<Cell> updateCells = updateData == null ? null : updateData.iterator();
-            timeDelta = Math.min(timeDelta, Cells.reconcileComplex(clustering, c, existingCells, updateCells, maxDt, builder, nowInSec, indexUpdater));
+            timeDelta = Math.min(timeDelta, Cells.reconcileComplex(c, existingCells, updateCells, maxDt, builder, nowInSec));
         }
 
         return timeDelta;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/Index.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
new file mode 100644
index 0000000..8f126fe
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -0,0 +1,388 @@
+package org.apache.cassandra.index;
+
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.BiFunction;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.transactions.IndexTransaction;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+/**
+ * Consisting of a top level Index interface and two sub-interfaces which handle read and write operations,
+ * Searcher and Indexer respectively, this defines a secondary index implementation.
+ * Instantiation is done via reflection and implementations must provide a constructor which takes the base
+ * table's ColumnFamilyStore and the IndexMetadata which defines the Index as arguments. e.g:
+ *  {@code MyCustomIndex( ColumnFamilyStore baseCfs, IndexMetadata indexDef )}
+ *
+ * The main interface defines methods for index management, index selection at both write and query time,
+ * as well as validation of values that will ultimately be indexed.
+ * Two sub-interfaces are also defined, which represent single use helpers for short lived tasks at read and write time.
+ * Indexer: an event listener which receives notifications at particular points during an update of a single partition
+ *          in the base table.
+ * Searcher: performs queries against the index based on a predicate defined in a RowFilter. An instance
+ *          is expected to be single use, being involved in the execution of a single ReadCommand.
+ *
+ * The main interface includes factory methods for obtaining instances of both of the sub-interfaces;
+ *
+ * The methods defined in the top level interface can be grouped into 3 categories:
+ *
+ * Management Tasks:
+ * This group of methods is primarily concerned with maintenance of secondary indexes are are mainly called from
+ * SecondaryIndexManager. It includes methods for registering and un-registering an index, performing maintenance
+ * tasks such as (re)building an index from SSTable data, flushing, invalidating and so forth, as well as some to
+ * retrieve general metadata about the index (index name, any internal tables used for persistence etc).
+ * Several of these maintenance functions have a return type of Callable<?>; the expectation for these methods is
+ * that any work required to be performed by the method be done inside the Callable so that the responsibility for
+ * scheduling its execution can rest with SecondaryIndexManager. For instance, a task like reloading index metadata
+ * following potential updates caused by modifications to the base table may be performed in a blocking way. In
+ * contrast, adding a new index may require it to be built from existing SSTable data, a potentially expensive task
+ * which should be performed asyncronously.
+ *
+ * Index Selection:
+ * There are two facets to index selection, write time and read time selection. The former is concerned with
+ * identifying whether an index should be informed about a particular write operation. The latter is about providing
+ * means to use the index for search during query execution.
+ *
+ * Validation:
+ * Values that may be written to an index are checked as part of input validation, prior to an update or insert
+ * operation being accepted.
+ *
+ *
+ * Sub-interfaces:
+ *
+ * Update processing:
+ * Indexes are subscribed to the stream of events generated by modifications to the base table. Subscription is
+ * done via first registering the Index with the base table's SecondaryIndexManager. For each partition update, the set
+ * of registered indexes are then filtered based on the properties of the update using the selection methods on the main
+ * interface described above. Each of the indexes in the filtered set then provides an event listener to receive
+ * notifications about the update as it is processed. As such then, a event handler instance is scoped to a single
+ * partition update; SecondaryIndexManager obtains a new handler for every update it processes (via a call to the
+ * factory method, indexerFor. That handler will then receive all events for the update, before being
+ * discarded by the SecondaryIndexManager. Indexer instances are never re-used by SecondaryIndexManager and the
+ * expectation is that each call to indexerFor should return a unique instance, or at least if instances can
+ * be recycled, that a given instance is only used to process a single partition update at a time.
+ *
+ * Search:
+ * Each query (i.e. a single ReadCommand) that uses indexes will use a single instance of Index.Searcher. As with
+ * processing of updates, an Index must be registered with the primary table's SecondaryIndexManager to be able to
+ * support queries. During the processing of a ReadCommand, the Expressions in its RowFilter are examined to determine
+ * whether any of them are supported by a registered Index. supportsExpression is used to filter out Indexes which
+ * cannot support a given Expression. After filtering, the set of candidate indexes are ranked according to the result
+ * of getEstimatedResultRows and the most selective (i.e. the one expected to return the smallest number of results) is
+ * chosen. A Searcher instance is then obtained from the searcherFor method & used to perform the actual Index lookup.
+ * Finally, Indexes can define a post processing step to be performed on the coordinator, after results (partitions from
+ * the primary table) have been received from replicas and reconciled. This post processing is defined as a
+ * java.util.functions.BiFunction<PartitionIterator, RowFilter, PartitionIterator>, that is a function which takes as
+ * arguments a PartitionIterator (containing the reconciled result rows) and a RowFilter (from the ReadCommand being
+ * executed) and returns another iterator of partitions, possibly having transformed the initial results in some way.
+ * The post processing function is obtained from the Index's postProcessorFor method; the built-in indexes which ship
+ * with Cassandra return a no-op function here.
+ *
+ * An optional static method may be provided to validate custom index options:
+ *
+ * <pre> {@code
+ * public static Map<String, String> validateOptions(Map<String, String> options);
+ * } </pre>
+ *
+ * The input is the map of index options supplied in the WITH clause of a CREATE INDEX statement. The method should
+ * return a map containing any of the supplied options which are not valid for the implementation. If the returned
+ * map is not empty, validation is considered failed and an error is raised. Alternatively, the implementation may
+ * choose to throw an org.apache.cassandra.exceptions.ConfigurationException if invalid options are encountered.
+ *
+ */
+public interface Index
+{
+
+    /*
+     * Management functions
+     */
+
+    /**
+     * Return a task to perform any initialization work when a new index instance is created.
+     * This may involve costly operations such as (re)building the index, and is performed asynchronously
+     * by SecondaryIndexManager
+     * @return a task to perform any necessary initialization work
+     */
+    public Callable<?> getInitializationTask();
+
+    /**
+     * Returns the IndexMetadata which configures and defines the index instance. This should be the same
+     * object passed as the argument to setIndexMetadata.
+     * @return the index's metadata
+     */
+    public IndexMetadata getIndexMetadata();
+
+    /**
+     * Return a task to reload the internal metadata of an index.
+     * Called when the base table metadata is modified or when the configuration of the Index is updated
+     * Implementations should return a task which performs any necessary work to be done due to
+     * updating the configuration(s) such as (re)building etc. This task is performed asynchronously
+     * by SecondaryIndexManager
+     * @return task to be executed by the index manager during a reload
+     */
+    public Callable<?> getMetadataReloadTask(IndexMetadata indexMetadata);
+
+    /**
+     * An index must be registered in order to be able to either subscribe to update events on the base
+     * table and/or to provide Searcher functionality for reads. The double dispatch involved here, where
+     * the Index actually performs its own registration by calling back to the supplied IndexRegistry's
+     * own registerIndex method, is to make the decision as to whether or not to register an index belong
+     * to the implementation, not the manager.
+     * @param registry the index registry to register the instance with
+     */
+    public void register(IndexRegistry registry);
+
+    /**
+     * Return an identifier for the index. This should be unique across all indexes on a given base table
+     * @return the name of the index
+     */
+    public String getIndexName();
+
+    /**
+     * If the index implementation uses a local table to store its index data this method should return a
+     * handle to it. If not, an empty Optional should be returned. Typically, this is useful for the built-in
+     * Index implementations.
+     * @return an Optional referencing the Index's backing storage table if it has one, or Optional.empty() if not.
+     */
+    public Optional<ColumnFamilyStore> getBackingTable();
+
+    /**
+     * Return a task which performs a blocking flush of the index's data to persistent storage.
+     * @return task to be executed by the index manager to perform the flush.
+     */
+    public Callable<?> getBlockingFlushTask();
+
+    /**
+     * Return a task which invalidates the index, indicating it should no longer be considered usable.
+     * This should include an clean up and releasing of resources required when dropping an index.
+     * @return task to be executed by the index manager to invalidate the index.
+     */
+    public Callable<?> getInvalidateTask();
+
+    /**
+     * Return a task to truncate the index with the specified truncation timestamp.
+     * Called when the base table is truncated.
+     * @param truncatedAt timestamp of the truncation operation. This will be the same timestamp used
+     *                    in the truncation of the base table.
+     * @return task to be executed by the index manager when the base table is truncated.
+     */
+    public Callable<?> getTruncateTask(long truncatedAt);
+
+    /*
+     * Index selection
+     */
+
+    /**
+     * Called to determine whether this index should process a particular partition update.
+     * @param columns
+     * @return
+     */
+    public boolean indexes(PartitionColumns columns);
+
+    // TODO : this will change when we decouple indexes from specific columns for real per-row indexes
+    /**
+     * Called to determine whether this index can provide a searcher to execute a query on the
+     * supplied column using the specified operator. This forms part of the query validation done
+     * before a CQL select statement is executed.
+     * @param column the target column of a search query predicate
+     * @param operator the operator of a search query predicate
+     * @return true if this index is capable of supporting such expressions, false otherwise
+     */
+    public boolean supportsExpression(ColumnDefinition column, Operator operator);
+
+    /**
+     * Transform an initial RowFilter into the filter that will still need to applied
+     * to a set of Rows after the index has performed it's initial scan.
+     * Used in ReadCommand#executeLocal to reduce the amount of filtering performed on the
+     * results of the index query.
+     *
+     * @param filter the intial filter belonging to a ReadCommand
+     * @return the (hopefully) reduced filter that would still need to be applied after
+     *         the index was used to narrow the initial result set
+     */
+    public RowFilter getPostIndexQueryFilter(RowFilter filter);
+
+    /**
+     * Return an estimate of the number of results this index is expected to return for any given
+     * query that it can be used to answer. Used in conjunction with indexes() and supportsExpression()
+     * to determine the most selective index for a given ReadCommand. Additionally, this is also used
+     * by StorageProxy.estimateResultsPerRange to calculate the initial concurrency factor for range requests
+     *
+     * @return the estimated average number of results a Searcher may return for any given query
+     */
+    public long getEstimatedResultRows();
+
+    /*
+     * Input validation
+     */
+
+    /**
+     * Called at write time to ensure that values present in the update
+     * are valid according to the rules of all registered indexes which
+     * will process it. The partition key as well as the clustering and
+     * cell values for each row in the update may be checked by index
+     * implementations
+     * @param update PartitionUpdate containing the values to be validated by registered Index implementations
+     * @throws InvalidRequestException
+     */
+    public void validate(PartitionUpdate update) throws InvalidRequestException;
+
+    /*
+     * Update processing
+     */
+
+    /**
+     * Factory method for write time event handlers.
+     * Callers should check the indexes method first and only get a new
+     * handler when the index claims an interest in the specific update
+     * otherwise work may be done unnecessarily
+     *
+     * @param key key of the partition being modified
+     * @param nowInSec current time of the update operation
+     * @param opGroup operation group spanning the update operation
+     * @param transactionType indicates what kind of update is being performed on the base data
+     *                        i.e. a write time insert/update/delete or the result of compaction
+     * @return
+     */
+    public Indexer indexerFor(DecoratedKey key,
+                              int nowInSec,
+                              OpOrder.Group opGroup,
+                              IndexTransaction.Type transactionType);
+
+    /**
+     * Listener for processing events emitted during a single partition update.
+     * Instances of this are responsible for applying modifications to the index in response to a single update
+     * operation on a particular partition of the base table.
+     * That update may be generated by the normal write path, by iterating SSTables during streaming operations or when
+     * building or rebuilding an index from source. Updates also occur during compaction when multiple versions of a
+     * source partition from different SSTables are merged.
+     */
+    public interface Indexer
+    {
+        /**
+         * Notification of the start of a partition update.
+         * This event always occurs before any other during the update.
+         */
+        public void begin();
+
+        /**
+         * Notification of a top level partition delete.
+         * @param deletionTime
+         */
+        public void partitionDelete(DeletionTime deletionTime);
+
+        /**
+         * Notification of a RangeTombstone.
+         * An update of a single partition may contain multiple RangeTombstones,
+         * and a notification will be passed for each of them.
+         * @param tombstone
+         */
+        public void rangeTombstone(RangeTombstone tombstone);
+
+        /**
+         * Notification that a new row was inserted into the Memtable holding the partition.
+         * This only implies that the inserted row was not already present in the Memtable,
+         * it *does not* guarantee that the row does not exist in an SSTable, potentially with
+         * additional column data.
+         *
+         * @param row the Row being inserted into the base table's Memtable.
+         */
+        public void insertRow(Row row);
+
+        /**
+         * Notification of a modification to a row in the base table's Memtable.
+         * This is allow an Index implementation to clean up entries for base data which is
+         * never flushed to disk (and so will not be purged during compaction).
+         * It's important to note that the old & new rows supplied here may not represent
+         * the totality of the data for the Row with this particular Clustering. There may be
+         * additional column data in SSTables which is not present in either the old or new row,
+         * so implementations should be aware of that.
+         * The supplied rows contain only column data which has actually been updated.
+         * oldRowData contains only the columns which have been removed from the Row's
+         * representation in the Memtable, while newRowData includes only new columns
+         * which were not previously present. Any column data which is unchanged by
+         * the update is not included.
+         *
+         * @param oldRowData data that was present in existing row and which has been removed from
+         *                   the base table's Memtable
+         * @param newRowData data that was not present in the existing row and is being inserted
+         *                   into the base table's Memtable
+         */
+        public void updateRow(Row oldRowData, Row newRowData);
+
+        /**
+         * Notification that a row was removed from the partition.
+         * Note that this is only called as part of either a compaction or a cleanup.
+         * This context is indicated by the TransactionType supplied to the indexerFor method.
+         *
+         * As with updateRow, it cannot be guaranteed that all data belonging to the Clustering
+         * of the supplied Row has been removed (although in the case of a cleanup, that is the
+         * ultimate intention).
+         * There may be data for the same row in other SSTables, so in this case Indexer implementations
+         * should *not* assume that all traces of the row have been removed. In particular,
+         * it is not safe to assert that all values associated with the Row's Clustering
+         * have been deleted, so implementations which index primary key columns should not
+         * purge those entries from their indexes.
+         *
+         * @param row data being removed from the base table
+         */
+        public void removeRow(Row row);
+
+        /**
+         * Notification of the end of the partition update.
+         * This event always occurs after all others for the particular update.
+         */
+        public void finish();
+    }
+
+    /*
+     * Querying
+     */
+
+    /**
+     * Return a function which performs post processing on the results of a partition range read command.
+     * In future, this may be used as a generalized mechanism for transforming results on the coordinator prior
+     * to returning them to the caller.
+     *
+     * This is used on the coordinator during execution of a range command to perform post
+     * processing of merged results obtained from the necessary replicas. This is the only way in which results are
+     * transformed in this way but this may change over time as usage is generalized.
+     * See CASSANDRA-8717 for further discussion.
+     *
+     * The function takes a PartitionIterator of the results from the replicas which has already been collated
+     * & reconciled, along with the command being executed. It returns another PartitionIterator containing the results
+     * of the transformation (which may be the same as the input if the transformation is a no-op).
+     */
+    public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command);
+
+    /**
+     * Factory method for query time search helper.
+     * @param command the read command being executed
+     * @return an Searcher with which to perform the supplied command
+     */
+    public Searcher searcherFor(ReadCommand command);
+
+    /**
+     * Performs the actual index lookup during execution of a ReadCommand.
+     * An instance performs its query according to the RowFilter.Expression it was created for (see searcherFor)
+     * An Expression is a predicate of the form [column] [operator] [value].
+     */
+    public interface Searcher
+    {
+        /**
+         * @param orderGroup the collection of OpOrder.Groups which the ReadCommand is being performed under.
+         * @return partitions from the base table matching the criteria of the search.
+         */
+        public UnfilteredPartitionIterator search(ReadOrderGroup orderGroup);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/IndexRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java b/src/java/org/apache/cassandra/index/IndexRegistry.java
new file mode 100644
index 0000000..6a004fb
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/IndexRegistry.java
@@ -0,0 +1,22 @@
+package org.apache.cassandra.index;
+
+import java.util.Collection;
+
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * The collection of all Index instances for a base table.
+ * The SecondaryIndexManager for a ColumnFamilyStore contains an IndexRegistry
+ * (actually it implements this interface at present) and Index implementations
+ * register in order to:
+ * i) subscribe to the stream of updates being applied to partitions in the base table
+ * ii) provide searchers to support queries with the relevant search predicates
+ */
+public interface IndexRegistry
+{
+    void registerIndex(Index index);
+    void unregisterIndex(Index index);
+
+    Index getIndex(IndexMetadata indexMetadata);
+    Collection<Index> listIndexes();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
new file mode 100644
index 0000000..d6ae8e2
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * Manages building an entire index from column family data. Runs on to compaction manager.
+ */
+public class SecondaryIndexBuilder extends CompactionInfo.Holder
+{
+    private final ColumnFamilyStore cfs;
+    private final Set<Index> indexers;
+    private final ReducingKeyIterator iter;
+    private final UUID compactionId;
+
+    public SecondaryIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter)
+    {
+        this.cfs = cfs;
+        this.indexers = indexers;
+        this.iter = iter;
+        this.compactionId = UUIDGen.getTimeUUID();
+    }
+
+    public CompactionInfo getCompactionInfo()
+    {
+        return new CompactionInfo(cfs.metadata,
+                                  OperationType.INDEX_BUILD,
+                                  iter.getBytesRead(),
+                                  iter.getTotalBytes(),
+                                  compactionId);
+    }
+
+    public void build()
+    {
+        try
+        {
+            while (iter.hasNext())
+            {
+                if (isStopRequested())
+                    throw new CompactionInterruptedException(getCompactionInfo());
+                DecoratedKey key = iter.next();
+                Keyspace.indexPartition(key, cfs, indexers);
+            }
+        }
+        finally
+        {
+            try
+            {
+                iter.close();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
new file mode 100644
index 0000000..5d32b70
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index;
+
+import java.lang.reflect.Constructor;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.transactions.*;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.Indexes;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+/**
+ * Handles the core maintenance functionality associated with indexes: adding/removing them to or from
+ * a table, (re)building during bootstrap or other streaming operations, flushing, reloading metadata
+ * and so on.
+ *
+ * The Index interface defines a number of methods which return Callable<?>. These are primarily the
+ * management tasks for an index implementation. Most of them are currently executed in a blocking
+ * fashion via submission to SIM's blockingExecutor. This provides the desired behaviour in pretty
+ * much all cases, as tasks like flushing an index needs to be executed synchronously to avoid potentially
+ * deadlocking on the FlushWriter or PostFlusher. Several of these Callable<?> returning methods on Index could
+ * then be defined with as void and called directly from SIM (rather than being run via the executor service).
+ * Separating the task defintion from execution gives us greater flexibility though, so that in future, for example,
+ * if the flush process allows it we leave open the possibility of executing more of these tasks asynchronously.
+ *
+ * The primary exception to the above is the Callable returned from Index#addIndexedColumn. This may
+ * involve a significant effort, building a new index over any existing data. We perform this task asynchronously;
+ * as it is called as part of a schema update, which we do not want to block for a long period. Building non-custom
+ * indexes is performed on the CompactionManager.
+ *
+ * This class also provides instances of processors which listen to updates to the base table and forward to
+ * registered Indexes the info required to keep those indexes up to date.
+ * There are two variants of these processors, each with a factory method provided by SIM:
+ *      IndexTransaction: deals with updates generated on the regular write path.
+ *      CleanupTransaction: used when partitions are modified during compaction or cleanup operations.
+ * Further details on their usage and lifecycles can be found in the interface definitions below.
+ *
+ * Finally, the bestIndexFor method is used at query time to identify the most selective index of those able
+ * to satisfy any search predicates defined by a ReadCommand's RowFilter. It returns a thin IndexAccessor object
+ * which enables the ReadCommand to access the appropriate functions of the Index at various stages in its lifecycle.
+ * e.g. the getEstimatedResultRows is required when StorageProxy calculates the initial concurrency factor for
+ * distributing requests to replicas, whereas a Searcher instance is needed when the ReadCommand is executed locally on
+ * a target replica.
+ */
+public class SecondaryIndexManager implements IndexRegistry
+{
+    private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
+
+
+    private Map<String, Index> indexes = Maps.newConcurrentMap();
+
+    // executes tasks returned by Indexer#addIndexColumn which may require index(es) to be (re)built
+    private static final ExecutorService asyncExecutor =
+        new JMXEnabledThreadPoolExecutor(1,
+                                         StageManager.KEEPALIVE,
+                                         TimeUnit.SECONDS,
+                                         new LinkedBlockingQueue<>(),
+                                         new NamedThreadFactory("SecondaryIndexManagement"),
+                                         "internal");
+
+    // executes all blocking tasks produced by Indexers e.g. getFlushTask, getMetadataReloadTask etc
+    private static final ExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService();
+
+    /**
+     * The underlying column family containing the source data for these indexes
+     */
+    public final ColumnFamilyStore baseCfs;
+
+    public SecondaryIndexManager(ColumnFamilyStore baseCfs)
+    {
+        this.baseCfs = baseCfs;
+    }
+
+
+    /**
+     * Drops and adds new indexes associated with the underlying CF
+     */
+    public void reload()
+    {
+        // figure out what needs to be added and dropped.
+        Indexes tableIndexes = baseCfs.metadata.getIndexes();
+        indexes.keySet()
+               .stream()
+               .filter(indexName -> !tableIndexes.has(indexName))
+               .forEach(this::removeIndex);
+
+        // we call add for every index definition in the collection as
+        // some may not have been created here yet, only added to schema
+        for (IndexMetadata tableIndex : tableIndexes)
+            addIndex(tableIndex);
+    }
+
+    private Future<?> reloadIndex(IndexMetadata indexDef)
+    {
+        // if the index metadata has changed, reload the index
+        IndexMetadata registered = indexes.get(indexDef.name).getIndexMetadata();
+        if (!registered.equals(indexDef))
+        {
+            Index index = indexes.remove(registered.name);
+            index.register(this);
+            return blockingExecutor.submit(index.getMetadataReloadTask(indexDef));
+        }
+
+        // otherwise, nothing to do
+        return Futures.immediateFuture(null);
+    }
+
+    private Future<?> createIndex(IndexMetadata indexDef)
+    {
+        Index index = createInstance(indexDef);
+        index.register(this);
+        final Callable<?> initialBuildTask = index.getInitializationTask();
+        return initialBuildTask == null
+               ? Futures.immediateFuture(null)
+               : asyncExecutor.submit(initialBuildTask);
+    }
+
+    /**
+     * Adds and builds a index
+     * @param indexDef the IndexMetadata describing the index
+     */
+    public synchronized Future<?> addIndex(IndexMetadata indexDef)
+    {
+        if (indexes.containsKey(indexDef.name))
+            return reloadIndex(indexDef);
+        else
+            return createIndex(indexDef);
+    }
+
+    public synchronized void removeIndex(String indexName)
+    {
+        Index index = indexes.remove(indexName);
+        if (null != index)
+        {
+            executeBlocking(index.getInvalidateTask());
+            unregisterIndex(index);
+        }
+    }
+
+    /**
+     * Called when dropping a Table
+     */
+    public void markAllIndexesRemoved()
+    {
+       getBuiltIndexNames().forEach(this::markIndexRemoved);
+    }
+
+    /**
+    * Does a full, blocking rebuild of the indexes specified by columns from the sstables.
+    * Caller must acquire and release references to the sstables used here.
+    * Note also that only this method of (re)building indexes:
+    *   a) takes a set of index *names* rather than Indexers
+    *   b) marks exsiting indexes removed prior to rebuilding
+    *
+    * @param sstables the data to build from
+    * @param indexNames the list of indexes to be rebuilt
+    */
+    public void rebuildIndexesBlocking(Collection<SSTableReader> sstables, Set<String> indexNames)
+    {
+        Set<Index> toRebuild = indexes.values().stream()
+                                               .filter(indexer -> indexNames.contains(indexer.getIndexName()))
+                                               .collect(Collectors.toSet());
+        if (toRebuild.isEmpty())
+        {
+            logger.info("No defined indexes with the supplied names");
+            return;
+        }
+
+        toRebuild.forEach(indexer -> markIndexRemoved(indexer.getIndexName()));
+
+        buildIndexesBlocking(sstables, toRebuild);
+
+        toRebuild.forEach(indexer -> markIndexBuilt(indexer.getIndexName()));
+    }
+
+    public void buildAllIndexesBlocking(Collection<SSTableReader> sstables)
+    {
+        buildIndexesBlocking(sstables, ImmutableSet.copyOf(indexes.values()));
+    }
+
+    // For convenience, may be called directly from Index impls
+    public void buildIndexBlocking(Index index)
+    {
+        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
+             Refs<SSTableReader> sstables = viewFragment.refs)
+        {
+            buildIndexesBlocking(sstables, Collections.singleton(index));
+            markIndexBuilt(index.getIndexName());
+        }
+    }
+
+    private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes)
+    {
+        if (indexes.isEmpty())
+            return;
+
+        logger.info("Submitting index build of {} for data in {}",
+                    indexes.stream().map(Index::getIndexName).collect(Collectors.joining(",")),
+                    sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
+
+        SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
+                                                                  indexes,
+                                                                  new ReducingKeyIterator(sstables));
+        Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
+        FBUtilities.waitOnFuture(future);
+
+        flushIndexesBlocking(indexes);
+        logger.info("Index build of {} complete",
+                    indexes.stream().map(Index::getIndexName).collect(Collectors.joining(",")));
+    }
+
+    private void markIndexBuilt(String indexName)
+    {
+        SystemKeyspace.setIndexBuilt(baseCfs.name, indexName);
+    }
+
+    private void markIndexRemoved(String indexName)
+    {
+        SystemKeyspace.setIndexRemoved(baseCfs.name, indexName);
+    }
+
+
+    public Index getIndexByName(String indexName)
+    {
+        return indexes.get(indexName);
+    }
+
+    private Index createInstance(IndexMetadata indexDef)
+    {
+        Index newIndex;
+        if (indexDef.isCustom())
+        {
+            assert indexDef.options != null;
+            String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
+            assert ! Strings.isNullOrEmpty(className);
+            try
+            {
+                Class<? extends Index> indexClass = FBUtilities.classForName(className, "Index");
+                Constructor ctor = indexClass.getConstructor(ColumnFamilyStore.class, IndexMetadata.class);
+                newIndex = (Index)ctor.newInstance(baseCfs, indexDef);
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+        else
+        {
+            newIndex = CassandraIndex.newIndex(baseCfs, indexDef);
+        }
+        return newIndex;
+    }
+
+    /**
+     * Truncate all indexes
+     */
+    public void truncateAllIndexesBlocking(final long truncatedAt)
+    {
+        executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt));
+    }
+
+    /**
+     * Remove all indexes
+     */
+    public void invalidateAllIndexesBlocking()
+    {
+        executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask);
+    }
+
+    /**
+     * Perform a blocking flush all indexes
+     */
+    public void flushAllIndexesBlocking()
+    {
+       flushIndexesBlocking(ImmutableSet.copyOf(indexes.values()));
+    }
+
+    /**
+     * Perform a blocking flush of selected indexes
+     */
+    public void flushIndexesBlocking(Set<Index> indexes)
+    {
+        if (indexes.isEmpty())
+            return;
+
+        List<Future<?>> wait = new ArrayList<>();
+        List<Index> nonCfsIndexes = new ArrayList<>();
+
+        // for each CFS backed index, submit a flush task which we'll wait on for completion
+        // for the non-CFS backed indexes, we'll flush those while we wait.
+        synchronized (baseCfs.getTracker())
+        {
+            indexes.forEach(index ->
+                index.getBackingTable()
+                     .map(cfs -> wait.add(cfs.forceFlush()))
+                     .orElse(nonCfsIndexes.add(index)));
+        }
+        executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask);
+        FBUtilities.waitOnFutures(wait);
+    }
+
+    /**
+     * Performs a blocking flush of all custom indexes
+     */
+    public void flushAllCustomIndexesBlocking()
+    {
+        Set<Index> customIndexers = indexes.values().stream()
+                                             .filter(index -> !(index instanceof CassandraIndex))
+                                             .collect(Collectors.toSet());
+        flushIndexesBlocking(customIndexers);
+    }
+
+    /**
+     * @return all indexes which are marked as built and ready to use
+     */
+    public List<String> getBuiltIndexNames()
+    {
+        Set<String> allIndexNames = new HashSet<>();
+        indexes.values().stream()
+                .map(Index::getIndexName)
+                .forEach(allIndexNames::add);
+        return SystemKeyspace.getBuiltIndexes(baseCfs.keyspace.getName(), allIndexNames);
+    }
+
+    /**
+     * @return all backing Tables used by registered indexes
+     */
+    public Set<ColumnFamilyStore> getAllIndexColumnFamilyStores()
+    {
+        Set<ColumnFamilyStore> backingTables = new HashSet<>();
+        indexes.values().forEach(index -> index.getBackingTable().ifPresent(backingTables::add));
+        return backingTables;
+    }
+
+    /**
+     * @return if there are ANY indexes registered for this table
+     */
+    public boolean hasIndexes()
+    {
+        return !indexes.isEmpty();
+    }
+
+    /**
+     * When building an index against existing data in sstables, add the given partition to the index
+     */
+    public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<Index> indexes, int nowInSec)
+    {
+        if (!indexes.isEmpty())
+        {
+            DecoratedKey key = partition.partitionKey();
+            Set<Index.Indexer> indexers = indexes.stream()
+                                                 .map(index -> index.indexerFor(key,
+                                                                                nowInSec,
+                                                                                opGroup,
+                                                                                IndexTransaction.Type.UPDATE))
+                                                 .collect(Collectors.toSet());
+
+            indexers.forEach(Index.Indexer::begin);
+
+            try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec))
+            {
+                if (!filtered.staticRow().isEmpty())
+                    indexers.forEach(indexer -> indexer.insertRow(filtered.staticRow()));
+
+                while (filtered.hasNext())
+                {
+                    Row row = filtered.next();
+                    indexers.forEach(indexer -> indexer.insertRow(row));
+                }
+            }
+
+            indexers.forEach(Index.Indexer::finish);
+        }
+    }
+
+    /**
+     * Delete all data from all indexes for this partition.
+     * For when cleanup rips a partition out entirely.
+     *
+     * TODO : improve cleanup transaction to batch updates & perform them async
+     */
+    public void deletePartition(UnfilteredRowIterator partition, int nowInSec)
+    {
+        // we need to acquire memtable lock because secondary index deletion may
+        // cause a race (see CASSANDRA-3712). This is done internally by the
+        // index transaction when it commits
+        CleanupTransaction indexTransaction = newCleanupTransaction(partition.partitionKey(),
+                                                                    partition.columns(),
+                                                                    nowInSec);
+        indexTransaction.start();
+        indexTransaction.onPartitionDeletion(partition.partitionLevelDeletion());
+        indexTransaction.commit();
+
+        while (partition.hasNext())
+        {
+            Unfiltered unfiltered = partition.next();
+            if (unfiltered.kind() != Unfiltered.Kind.ROW)
+                continue;
+
+            indexTransaction = newCleanupTransaction(partition.partitionKey(),
+                                                     partition.columns(),
+                                                     nowInSec);
+            indexTransaction.start();
+            indexTransaction.onRowDelete((Row)unfiltered);
+            indexTransaction.commit();
+        }
+    }
+
+    /**
+     * Called at query time to find the most selective of the registered index implementation
+     * (i.e. the one likely to return the fewest results) from those registered.
+     * Implementation specific validation of the target expression by the most selective
+     * index should be performed in the searcherFor method to ensure that we pick the right
+     * index regardless of the validity of the expression.
+     *
+     * This method is called at various points during the lifecycle of a ReadCommand (to obtain a Searcher,
+     * get the index's underlying CFS for ReadOrderGroup, or an estimate of the result size from an average index
+     * query).
+     *
+     * Ideally, we would do this relatively expensive operation only once, and attach the index to the
+     * ReadCommand for future reference. This requires the index be passed onto additional commands generated
+     * to process subranges etc.
+     *
+     * @param command ReadCommand to be executed
+     * @return an Index instance, ready to use during execution of the command, or null if none
+     * of the registered indexes can support the command.
+     */
+    public Index getBestIndexFor(ReadCommand command, boolean includeInTrace)
+    {
+        if (indexes.isEmpty() || command.rowFilter().isEmpty())
+            return null;
+
+        Set<Index> searchableIndexes = new HashSet<>();
+        for (RowFilter.Expression expression : command.rowFilter())
+        {
+            indexes.values().stream()
+                            .filter(index -> index.supportsExpression(expression.column(), expression.operator()))
+                            .forEach(searchableIndexes::add);
+        }
+
+        if (searchableIndexes.isEmpty())
+        {
+            logger.debug("No applicable indexes found");
+            if (includeInTrace)
+                Tracing.trace("No applicable indexes found");
+            return null;
+        }
+
+        Index selected = searchableIndexes.stream()
+                                          .max((a, b) -> Longs.compare(a.getEstimatedResultRows(),
+                                                                       b.getEstimatedResultRows()))
+                                          .orElseThrow(() -> new AssertionError("Could not select most selective index"));
+
+        // pay for an additional threadlocal get() rather than build the strings unnecessarily
+        if (includeInTrace && Tracing.isTracing())
+        {
+            Tracing.trace("Index mean cardinalities are {}. Scanning with {}.",
+                          searchableIndexes.stream().map(i -> i.getIndexName() + ':' + i.getEstimatedResultRows())
+                                           .collect(Collectors.joining(",")),
+                          selected.getIndexName());
+        }
+        return selected;
+    }
+
+    // convenience method which doesn't emit tracing messages
+    public Index getBestIndexFor(ReadCommand command)
+    {
+        return getBestIndexFor(command, false);
+    }
+
+    /**
+     * Called at write time to ensure that values present in the update
+     * are valid according to the rules of all registered indexes which
+     * will process it. The partition key as well as the clustering and
+     * cell values for each row in the update may be checked by index
+     * implementations
+     * @param update PartitionUpdate containing the values to be validated by registered Index implementations
+     * @throws InvalidRequestException
+     */
+    public void validate(PartitionUpdate update) throws InvalidRequestException
+    {
+        indexes.values()
+               .stream()
+               .filter(i -> i.indexes(update.columns()))
+               .forEach(i -> i.validate(update));
+    }
+
+    /**
+     * IndexRegistry methods
+     */
+    public void registerIndex(Index index)
+    {
+        indexes.put(index.getIndexMetadata().name, index);
+        logger.debug("Registered index {}", index.getIndexMetadata().name);
+    }
+
+    public void unregisterIndex(Index index)
+    {
+        Index removed = indexes.remove(index.getIndexMetadata().name);
+        logger.debug(removed == null ? "Index {} was not registered" : "Removed index {} from registry",
+                     index.getIndexMetadata().name);
+    }
+
+    public Index getIndex(IndexMetadata metadata)
+    {
+        return indexes.get(metadata.name);
+    }
+
+    public Collection<Index> listIndexes()
+    {
+        return ImmutableSet.copyOf(indexes.values());
+    }
+
+    /**
+     * Handling of index updates.
+     * Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data
+     * during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances.
+     */
+
+    /**
+     * Transaction for updates on the write path.
+     */
+    public UpdateTransaction newUpdateTransaction(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
+    {
+        if (!hasIndexes())
+            return UpdateTransaction.NO_OP;
+
+        // todo : optimize lookup, we can probably cache quite a bit of stuff, rather than doing
+        // a linear scan every time. Holding off that though until CASSANDRA-7771 to figure out
+        // exactly how indexes are to be identified & associated with a given partition update
+        Index.Indexer[] indexers = indexes.values().stream()
+                                          .filter(i -> i.indexes(update.columns()))
+                                          .map(i -> i.indexerFor(update.partitionKey(),
+                                                                 nowInSec,
+                                                                 opGroup,
+                                                                 IndexTransaction.Type.UPDATE))
+                                          .toArray(Index.Indexer[]::new);
+
+        return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers);
+    }
+
+    /**
+     * Transaction for use when merging rows during compaction
+     */
+    public CompactionTransaction newCompactionTransaction(DecoratedKey key,
+                                                          PartitionColumns partitionColumns,
+                                                          int versions,
+                                                          int nowInSec)
+    {
+        // the check for whether there are any registered indexes is already done in CompactionIterator
+
+        Index[] interestedIndexes = indexes.values().stream()
+                                           .filter(i -> i.indexes(partitionColumns))
+                                           .toArray(Index[]::new);
+
+        return interestedIndexes.length == 0
+               ? CompactionTransaction.NO_OP
+               : new IndexGCTransaction(key, versions, nowInSec, interestedIndexes);
+    }
+
+    /**
+     * Transaction for use when removing partitions during cleanup
+     */
+    public CleanupTransaction newCleanupTransaction(DecoratedKey key,
+                                                    PartitionColumns partitionColumns,
+                                                    int nowInSec)
+    {
+        //
+        if (!hasIndexes())
+            return CleanupTransaction.NO_OP;
+
+        Index[] interestedIndexes = indexes.values().stream()
+                                           .filter(i -> i.indexes(partitionColumns))
+                                           .toArray(Index[]::new);
+
+        return interestedIndexes.length == 0
+               ? CleanupTransaction.NO_OP
+               : new CleanupGCTransaction(key, nowInSec, interestedIndexes);
+    }
+
+    /**
+     * A single use transaction for processing a partition update on the regular write path
+     */
+    private static final class WriteTimeTransaction implements UpdateTransaction
+    {
+        private final Index.Indexer[] indexers;
+
+        private WriteTimeTransaction(Index.Indexer...indexers)
+        {
+            // don't allow null indexers, if we don't need any use a NullUpdater object
+            for (Index.Indexer indexer : indexers) assert indexer != null;
+            this.indexers = indexers;
+        }
+
+        public void start()
+        {
+            Arrays.stream(indexers).forEach(Index.Indexer::begin);
+        }
+
+        public void onPartitionDeletion(DeletionTime deletionTime)
+        {
+            Arrays.stream(indexers).forEach(h -> h.partitionDelete(deletionTime));
+        }
+
+        public void onRangeTombstone(RangeTombstone tombstone)
+        {
+            Arrays.stream(indexers) .forEach(h -> h.rangeTombstone(tombstone));
+        }
+
+        public void onInserted(Row row)
+        {
+            Arrays.stream(indexers).forEach(h -> h.insertRow(row));
+        }
+
+        public void onUpdated(Row existing, Row updated)
+        {
+            final Row.Builder toRemove = BTreeRow.sortedBuilder(existing.columns());
+            toRemove.newRow(existing.clustering());
+            final Row.Builder toInsert = BTreeRow.sortedBuilder(updated.columns());
+            toInsert.newRow(updated.clustering());
+            // diff listener collates the columns to be added & removed from the indexes
+            RowDiffListener diffListener = new RowDiffListener()
+            {
+                public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
+                {
+                    if (merged != null && merged != original)
+                        toInsert.addPrimaryKeyLivenessInfo(merged);
+                }
+
+                public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original)
+                {
+                }
+
+                public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
+                {
+                }
+
+                public void onCell(int i, Clustering clustering, Cell merged, Cell original)
+                {
+                    if (merged != null && merged != original)
+                        toInsert.addCell(merged);
+
+                    if (merged == null || (original != null && shouldCleanupOldValue(original, merged)))
+                        toRemove.addCell(original);
+
+                }
+            };
+            Rows.diff(diffListener, updated, updated.columns().mergeTo(existing.columns()), existing);
+            Row oldRow = toRemove.build();
+            Row newRow = toInsert.build();
+            Arrays.stream(indexers).forEach(i -> i.updateRow(oldRow, newRow));
+        }
+
+        public void commit()
+        {
+            Arrays.stream(indexers).forEach(Index.Indexer::finish);
+        }
+
+        private boolean shouldCleanupOldValue(Cell oldCell, Cell newCell)
+        {
+            // If either the value or timestamp is different, then we
+            // should delete from the index. If not, then we can infer that
+            // at least one of the cells is an ExpiringColumn and that the
+            // difference is in the expiry time. In this case, we don't want to
+            // delete the old value from the index as the tombstone we insert
+            // will just hide the inserted value.
+            // Completely identical cells (including expiring columns with
+            // identical ttl & localExpirationTime) will not get this far due
+            // to the oldCell.equals(newCell) in StandardUpdater.update
+            return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp();
+        }
+    }
+
+    /**
+     * A single-use transaction for updating indexes for a single partition during compaction where the only
+     * operation is to merge rows
+     * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in
+     * a single partition
+     */
+    private static final class IndexGCTransaction implements CompactionTransaction
+    {
+        private final DecoratedKey key;
+        private final int versions;
+        private final int nowInSec;
+        private final Index[] indexes;
+
+        private Row[] rows;
+
+        private IndexGCTransaction(DecoratedKey key,
+                                   int versions,
+                                   int nowInSec,
+                                   Index...indexes)
+        {
+            // don't allow null indexers, if we don't have any, use a noop transaction
+            for (Index index : indexes) assert index != null;
+
+            this.key = key;
+            this.versions = versions;
+            this.indexes = indexes;
+            this.nowInSec = nowInSec;
+        }
+
+        public void start()
+        {
+            if (versions > 0)
+                rows = new Row[versions];
+        }
+
+        public void onRowMerge(Columns columns, Row merged, Row...versions)
+        {
+            // Diff listener constructs rows representing deltas between the merged and original versions
+            // These delta rows are then passed to registered indexes for removal processing
+            final Row.Builder[] builders = new Row.Builder[versions.length];
+            RowDiffListener diffListener = new RowDiffListener()
+            {
+                public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
+                {
+                }
+
+                public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original)
+                {
+                }
+
+                public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
+                {
+                }
+
+                public void onCell(int i, Clustering clustering, Cell merged, Cell original)
+                {
+                    if (original != null && merged == null)
+                    {
+                        if (builders[i] == null)
+                        {
+                            builders[i] = BTreeRow.sortedBuilder(columns);
+                            builders[i].newRow(clustering);
+                        }
+                        builders[i].addCell(original);
+                    }
+                }
+            };
+
+            Rows.diff(diffListener, merged, columns, versions);
+
+            for(int i = 0; i < builders.length; i++)
+                if (builders[i] != null)
+                    rows[i] = builders[i].build();
+        }
+
+        public void commit()
+        {
+            if (rows == null)
+                return;
+
+            try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
+            {
+                Index.Indexer[] indexers = Arrays.stream(indexes)
+                                                 .map(i -> i.indexerFor(key, nowInSec, opGroup, Type.COMPACTION))
+                                                 .toArray(Index.Indexer[]::new);
+
+                Arrays.stream(indexers).forEach(Index.Indexer::begin);
+
+                for (Row row : rows)
+                    if (row != null)
+                        Arrays.stream(indexers).forEach(indexer -> indexer.removeRow(row));
+
+                Arrays.stream(indexers).forEach(Index.Indexer::finish);
+            }
+        }
+    }
+
+    /**
+     * A single-use transaction for updating indexes for a single partition during cleanup, where
+     * partitions and rows are only removed
+     * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in
+     * a single partition
+     */
+    private static final class CleanupGCTransaction implements CleanupTransaction
+    {
+        private final DecoratedKey key;
+        private final int nowInSec;
+        private final Index[] indexes;
+
+        private Row row;
+        private DeletionTime partitionDelete;
+
+        private CleanupGCTransaction(DecoratedKey key,
+                                     int nowInSec,
+                                     Index...indexes)
+        {
+            // don't allow null indexers, if we don't have any, use a noop transaction
+            for (Index index : indexes) assert index != null;
+
+            this.key = key;
+            this.indexes = indexes;
+            this.nowInSec = nowInSec;
+        }
+
+        public void start()
+        {
+        }
+
+        public void onPartitionDeletion(DeletionTime deletionTime)
+        {
+            partitionDelete = deletionTime;
+        }
+
+        public void onRowDelete(Row row)
+        {
+            this.row = row;
+        }
+
+        public void commit()
+        {
+            if (row == null && partitionDelete == null)
+                return;
+
+            try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
+            {
+                Index.Indexer[] indexers = Arrays.stream(indexes)
+                                                 .map(i -> i.indexerFor(key, nowInSec, opGroup, Type.CLEANUP))
+                                                 .toArray(Index.Indexer[]::new);
+
+                Arrays.stream(indexers).forEach(Index.Indexer::begin);
+
+                if (partitionDelete != null)
+                    Arrays.stream(indexers).forEach(indexer -> indexer.partitionDelete(partitionDelete));
+
+                if (row != null)
+                    Arrays.stream(indexers).forEach(indexer -> indexer.removeRow(row));
+
+                Arrays.stream(indexers).forEach(Index.Indexer::finish);
+            }
+        }
+    }
+
+    private static void executeBlocking(Callable<?> task)
+    {
+        if (null != task)
+            FBUtilities.waitOnFuture(blockingExecutor.submit(task));
+    }
+
+    private static void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function)
+    {
+        List<Future<?>> waitFor = new ArrayList<>();
+        indexers.forEach(indexer -> {
+            Callable<?> task = function.apply(indexer);
+            if (null != task)
+                waitFor.add(blockingExecutor.submit(task));
+        });
+        FBUtilities.waitOnFutures(waitFor);
+    }
+}


Mime
View raw message