Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 46E4C18AB6 for ; Fri, 21 Aug 2015 23:48:38 +0000 (UTC) Received: (qmail 68885 invoked by uid 500); 21 Aug 2015 23:48:33 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 68793 invoked by uid 500); 21 Aug 2015 23:48:33 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 67716 invoked by uid 99); 21 Aug 2015 23:48:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 23:48:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0DA64E7DB5; Fri, 21 Aug 2015 23:48:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: samt@apache.org To: commits@cassandra.apache.org Date: Fri, 21 Aug 2015 23:48:42 -0000 Message-Id: <3e03786ca2e74e32b28d0b9cd5697e00@git.apache.org> In-Reply-To: <2256eaed1e3448f386a287bd1ae66367@git.apache.org> References: <2256eaed1e3448f386a287bd1ae66367@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/15] cassandra git commit: New 2i API and implementations for built in indexes http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/rows/Cells.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java b/src/java/org/apache/cassandra/db/rows/Cells.java index 080d640..54df26e 100644 --- a/src/java/org/apache/cassandra/db/rows/Cells.java +++ b/src/java/org/apache/cassandra/db/rows/Cells.java @@ -22,9 +22,9 @@ import java.util.Comparator; import java.util.Iterator; import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.Conflicts; +import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; -import org.apache.cassandra.db.index.SecondaryIndexManager; /** * Static methods to work on cells. @@ -58,9 +58,6 @@ public abstract class Cells * Also note that which cell is provided as {@code existing} and which is * provided as {@code update} matters for index updates. * - * @param clustering the clustering for the row the cells to merge originate from. - * This is only used for index updates, so this can be {@code null} if - * {@code indexUpdater == SecondaryIndexManager.nullUpdater}. * @param existing the pre-existing cell, the one that is updated. This can be * {@code null} if this reconciliation correspond to an insertion. * @param update the newly added cell, the update. This can be {@code null} out @@ -72,20 +69,15 @@ public abstract class Cells * @param nowInSec the current time in seconds (which plays a role during reconciliation * because deleted cells always have precedence on timestamp equality and deciding if a * cell is a live or not depends on the current time due to expiring cells). - * @param indexUpdater an index updater to which the result of the reconciliation is - * signaled (if relevant, that is if the update is not simply ignored by the reconciliation). - * This cannot be {@code null} but {@code SecondaryIndexManager.nullUpdater} can be passed. * * @return the timestamp delta between existing and update, or {@code Long.MAX_VALUE} if one * of them is {@code null} or deleted by {@code deletion}). */ - public static long reconcile(Clustering clustering, - Cell existing, + public static long reconcile(Cell existing, Cell update, DeletionTime deletion, Row.Builder builder, - int nowInSec, - SecondaryIndexManager.Updater indexUpdater) + int nowInSec) { existing = existing == null || deletion.deletes(existing) ? null : existing; update = update == null || deletion.deletes(update) ? null : update; @@ -93,10 +85,6 @@ public abstract class Cells { if (update != null) { - // It's inefficient that we call maybeIndex (which is for primary key indexes) on every cell, but - // we'll need to fix that damn 2ndary index API to avoid that. - updatePKIndexes(clustering, update, nowInSec, indexUpdater); - indexUpdater.insert(clustering, update); builder.addCell(update); } else if (existing != null) @@ -109,21 +97,9 @@ public abstract class Cells Cell reconciled = reconcile(existing, update, nowInSec); builder.addCell(reconciled); - // Note that this test rely on reconcile returning either 'existing' or 'update'. That's not true for counters but we don't index them - if (reconciled == update) - { - updatePKIndexes(clustering, update, nowInSec, indexUpdater); - indexUpdater.update(clustering, existing, reconciled); - } return Math.abs(existing.timestamp() - update.timestamp()); } - private static void updatePKIndexes(Clustering clustering, Cell cell, int nowInSec, SecondaryIndexManager.Updater indexUpdater) - { - if (indexUpdater != SecondaryIndexManager.nullUpdater && cell.isLive(nowInSec)) - indexUpdater.maybeIndex(clustering, cell.timestamp(), cell.ttl(), DeletionTime.LIVE); - } - /** * Reconciles/merge two cells. *

@@ -202,9 +178,6 @@ public abstract class Cells * Also note that which cells is provided as {@code existing} and which are * provided as {@code update} matters for index updates. * - * @param clustering the clustering for the row the cells to merge originate from. - * This is only used for index updates, so this can be {@code null} if - * {@code indexUpdater == SecondaryIndexManager.nullUpdater}. * @param column the complex column the cells are for. * @param existing the pre-existing cells, the ones that are updated. This can be * {@code null} if this reconciliation correspond to an insertion. @@ -217,9 +190,6 @@ public abstract class Cells * @param nowInSec the current time in seconds (which plays a role during reconciliation * because deleted cells always have precedence on timestamp equality and deciding if a * cell is a live or not depends on the current time due to expiring cells). - * @param indexUpdater an index updater to which the result of the reconciliation is - * signaled (if relevant, that is if the updates are not simply ignored by the reconciliation). - * This cannot be {@code null} but {@code SecondaryIndexManager.nullUpdater} can be passed. * * @return the smallest timestamp delta between corresponding cells from existing and update. A * timestamp delta being computed as the difference between a cell from {@code update} and the @@ -227,14 +197,12 @@ public abstract class Cells * of cells from {@code existing} and {@code update} having the same cell path is empty, this * returns {@code Long.MAX_VALUE}. */ - public static long reconcileComplex(Clustering clustering, - ColumnDefinition column, + public static long reconcileComplex(ColumnDefinition column, Iterator existing, Iterator update, DeletionTime deletion, Row.Builder builder, - int nowInSec, - SecondaryIndexManager.Updater indexUpdater) + int nowInSec) { Comparator comparator = column.cellPathComparator(); Cell nextExisting = getNext(existing); @@ -247,17 +215,17 @@ public abstract class Cells : comparator.compare(nextExisting.path(), nextUpdate.path())); if (cmp < 0) { - reconcile(clustering, nextExisting, null, deletion, builder, nowInSec, indexUpdater); + reconcile(nextExisting, null, deletion, builder, nowInSec); nextExisting = getNext(existing); } else if (cmp > 0) { - reconcile(clustering, null, nextUpdate, deletion, builder, nowInSec, indexUpdater); + reconcile(null, nextUpdate, deletion, builder, nowInSec); nextUpdate = getNext(update); } else { - timeDelta = Math.min(timeDelta, reconcile(clustering, nextExisting, nextUpdate, deletion, builder, nowInSec, indexUpdater)); + timeDelta = Math.min(timeDelta, reconcile(nextExisting, nextUpdate, deletion, builder, nowInSec)); nextExisting = getNext(existing); nextUpdate = getNext(update); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/rows/Rows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java index 0b739a8..ce177f2 100644 --- a/src/java/org/apache/cassandra/db/rows/Rows.java +++ b/src/java/org/apache/cassandra/db/rows/Rows.java @@ -17,7 +17,9 @@ */ package org.apache.cassandra.db.rows; -import java.util.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; @@ -25,7 +27,6 @@ import com.google.common.collect.PeekingIterator; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; -import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.utils.SearchIterator; /** @@ -127,7 +128,7 @@ public abstract class Rows * @param diffListener the listener to which to signal the differences between the inputs and the merged * result. */ - public static void diff(Row merged, Columns columns, Row[] inputs, RowDiffListener diffListener) + public static void diff(RowDiffListener diffListener, Row merged, Columns columns, Row...inputs) { Clustering clustering = merged.clustering(); LivenessInfo mergedInfo = merged.primaryKeyLivenessInfo().isEmpty() ? null : merged.primaryKeyLivenessInfo(); @@ -218,23 +219,17 @@ public abstract class Rows { Columns mergedColumns = row1.columns().mergeTo(row2.columns()); Row.Builder builder = BTreeRow.sortedBuilder(mergedColumns); - merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater); + merge(row1, row2, mergedColumns, builder, nowInSec); return builder.build(); } - public static void merge(Row row1, Row row2, Columns mergedColumns, Row.Builder builder, int nowInSec) - { - merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater); - } - // Merge rows in memtable // Return the minimum timestamp delta between existing and update public static long merge(Row existing, Row update, Columns mergedColumns, Row.Builder builder, - int nowInSec, - SecondaryIndexManager.Updater indexUpdater) + int nowInSec) { Clustering clustering = existing.clustering(); builder.newRow(clustering); @@ -253,20 +248,16 @@ public abstract class Rows builder.addPrimaryKeyLivenessInfo(mergedInfo); builder.addRowDeletion(deletion); - indexUpdater.maybeIndex(clustering, mergedInfo.timestamp(), mergedInfo.ttl(), deletion); - for (int i = 0; i < mergedColumns.simpleColumnCount(); i++) { ColumnDefinition c = mergedColumns.getSimple(i); Cell existingCell = existing.getCell(c); Cell updateCell = update.getCell(c); - timeDelta = Math.min(timeDelta, Cells.reconcile(clustering, - existingCell, + timeDelta = Math.min(timeDelta, Cells.reconcile(existingCell, updateCell, deletion, builder, - nowInSec, - indexUpdater)); + nowInSec)); } for (int i = 0; i < mergedColumns.complexColumnCount(); i++) @@ -285,7 +276,7 @@ public abstract class Rows Iterator existingCells = existingData == null ? null : existingData.iterator(); Iterator updateCells = updateData == null ? null : updateData.iterator(); - timeDelta = Math.min(timeDelta, Cells.reconcileComplex(clustering, c, existingCells, updateCells, maxDt, builder, nowInSec, indexUpdater)); + timeDelta = Math.min(timeDelta, Cells.reconcileComplex(c, existingCells, updateCells, maxDt, builder, nowInSec)); } return timeDelta; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/Index.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java new file mode 100644 index 0000000..8f126fe --- /dev/null +++ b/src/java/org/apache/cassandra/index/Index.java @@ -0,0 +1,388 @@ +package org.apache.cassandra.index; + +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.function.BiFunction; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.transactions.IndexTransaction; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.utils.concurrent.OpOrder; + +/** + * Consisting of a top level Index interface and two sub-interfaces which handle read and write operations, + * Searcher and Indexer respectively, this defines a secondary index implementation. + * Instantiation is done via reflection and implementations must provide a constructor which takes the base + * table's ColumnFamilyStore and the IndexMetadata which defines the Index as arguments. e.g: + * {@code MyCustomIndex( ColumnFamilyStore baseCfs, IndexMetadata indexDef )} + * + * The main interface defines methods for index management, index selection at both write and query time, + * as well as validation of values that will ultimately be indexed. + * Two sub-interfaces are also defined, which represent single use helpers for short lived tasks at read and write time. + * Indexer: an event listener which receives notifications at particular points during an update of a single partition + * in the base table. + * Searcher: performs queries against the index based on a predicate defined in a RowFilter. An instance + * is expected to be single use, being involved in the execution of a single ReadCommand. + * + * The main interface includes factory methods for obtaining instances of both of the sub-interfaces; + * + * The methods defined in the top level interface can be grouped into 3 categories: + * + * Management Tasks: + * This group of methods is primarily concerned with maintenance of secondary indexes are are mainly called from + * SecondaryIndexManager. It includes methods for registering and un-registering an index, performing maintenance + * tasks such as (re)building an index from SSTable data, flushing, invalidating and so forth, as well as some to + * retrieve general metadata about the index (index name, any internal tables used for persistence etc). + * Several of these maintenance functions have a return type of Callable; the expectation for these methods is + * that any work required to be performed by the method be done inside the Callable so that the responsibility for + * scheduling its execution can rest with SecondaryIndexManager. For instance, a task like reloading index metadata + * following potential updates caused by modifications to the base table may be performed in a blocking way. In + * contrast, adding a new index may require it to be built from existing SSTable data, a potentially expensive task + * which should be performed asyncronously. + * + * Index Selection: + * There are two facets to index selection, write time and read time selection. The former is concerned with + * identifying whether an index should be informed about a particular write operation. The latter is about providing + * means to use the index for search during query execution. + * + * Validation: + * Values that may be written to an index are checked as part of input validation, prior to an update or insert + * operation being accepted. + * + * + * Sub-interfaces: + * + * Update processing: + * Indexes are subscribed to the stream of events generated by modifications to the base table. Subscription is + * done via first registering the Index with the base table's SecondaryIndexManager. For each partition update, the set + * of registered indexes are then filtered based on the properties of the update using the selection methods on the main + * interface described above. Each of the indexes in the filtered set then provides an event listener to receive + * notifications about the update as it is processed. As such then, a event handler instance is scoped to a single + * partition update; SecondaryIndexManager obtains a new handler for every update it processes (via a call to the + * factory method, indexerFor. That handler will then receive all events for the update, before being + * discarded by the SecondaryIndexManager. Indexer instances are never re-used by SecondaryIndexManager and the + * expectation is that each call to indexerFor should return a unique instance, or at least if instances can + * be recycled, that a given instance is only used to process a single partition update at a time. + * + * Search: + * Each query (i.e. a single ReadCommand) that uses indexes will use a single instance of Index.Searcher. As with + * processing of updates, an Index must be registered with the primary table's SecondaryIndexManager to be able to + * support queries. During the processing of a ReadCommand, the Expressions in its RowFilter are examined to determine + * whether any of them are supported by a registered Index. supportsExpression is used to filter out Indexes which + * cannot support a given Expression. After filtering, the set of candidate indexes are ranked according to the result + * of getEstimatedResultRows and the most selective (i.e. the one expected to return the smallest number of results) is + * chosen. A Searcher instance is then obtained from the searcherFor method & used to perform the actual Index lookup. + * Finally, Indexes can define a post processing step to be performed on the coordinator, after results (partitions from + * the primary table) have been received from replicas and reconciled. This post processing is defined as a + * java.util.functions.BiFunction, that is a function which takes as + * arguments a PartitionIterator (containing the reconciled result rows) and a RowFilter (from the ReadCommand being + * executed) and returns another iterator of partitions, possibly having transformed the initial results in some way. + * The post processing function is obtained from the Index's postProcessorFor method; the built-in indexes which ship + * with Cassandra return a no-op function here. + * + * An optional static method may be provided to validate custom index options: + * + *

 {@code
+ * public static Map validateOptions(Map options);
+ * } 
+ * + * The input is the map of index options supplied in the WITH clause of a CREATE INDEX statement. The method should + * return a map containing any of the supplied options which are not valid for the implementation. If the returned + * map is not empty, validation is considered failed and an error is raised. Alternatively, the implementation may + * choose to throw an org.apache.cassandra.exceptions.ConfigurationException if invalid options are encountered. + * + */ +public interface Index +{ + + /* + * Management functions + */ + + /** + * Return a task to perform any initialization work when a new index instance is created. + * This may involve costly operations such as (re)building the index, and is performed asynchronously + * by SecondaryIndexManager + * @return a task to perform any necessary initialization work + */ + public Callable getInitializationTask(); + + /** + * Returns the IndexMetadata which configures and defines the index instance. This should be the same + * object passed as the argument to setIndexMetadata. + * @return the index's metadata + */ + public IndexMetadata getIndexMetadata(); + + /** + * Return a task to reload the internal metadata of an index. + * Called when the base table metadata is modified or when the configuration of the Index is updated + * Implementations should return a task which performs any necessary work to be done due to + * updating the configuration(s) such as (re)building etc. This task is performed asynchronously + * by SecondaryIndexManager + * @return task to be executed by the index manager during a reload + */ + public Callable getMetadataReloadTask(IndexMetadata indexMetadata); + + /** + * An index must be registered in order to be able to either subscribe to update events on the base + * table and/or to provide Searcher functionality for reads. The double dispatch involved here, where + * the Index actually performs its own registration by calling back to the supplied IndexRegistry's + * own registerIndex method, is to make the decision as to whether or not to register an index belong + * to the implementation, not the manager. + * @param registry the index registry to register the instance with + */ + public void register(IndexRegistry registry); + + /** + * Return an identifier for the index. This should be unique across all indexes on a given base table + * @return the name of the index + */ + public String getIndexName(); + + /** + * If the index implementation uses a local table to store its index data this method should return a + * handle to it. If not, an empty Optional should be returned. Typically, this is useful for the built-in + * Index implementations. + * @return an Optional referencing the Index's backing storage table if it has one, or Optional.empty() if not. + */ + public Optional getBackingTable(); + + /** + * Return a task which performs a blocking flush of the index's data to persistent storage. + * @return task to be executed by the index manager to perform the flush. + */ + public Callable getBlockingFlushTask(); + + /** + * Return a task which invalidates the index, indicating it should no longer be considered usable. + * This should include an clean up and releasing of resources required when dropping an index. + * @return task to be executed by the index manager to invalidate the index. + */ + public Callable getInvalidateTask(); + + /** + * Return a task to truncate the index with the specified truncation timestamp. + * Called when the base table is truncated. + * @param truncatedAt timestamp of the truncation operation. This will be the same timestamp used + * in the truncation of the base table. + * @return task to be executed by the index manager when the base table is truncated. + */ + public Callable getTruncateTask(long truncatedAt); + + /* + * Index selection + */ + + /** + * Called to determine whether this index should process a particular partition update. + * @param columns + * @return + */ + public boolean indexes(PartitionColumns columns); + + // TODO : this will change when we decouple indexes from specific columns for real per-row indexes + /** + * Called to determine whether this index can provide a searcher to execute a query on the + * supplied column using the specified operator. This forms part of the query validation done + * before a CQL select statement is executed. + * @param column the target column of a search query predicate + * @param operator the operator of a search query predicate + * @return true if this index is capable of supporting such expressions, false otherwise + */ + public boolean supportsExpression(ColumnDefinition column, Operator operator); + + /** + * Transform an initial RowFilter into the filter that will still need to applied + * to a set of Rows after the index has performed it's initial scan. + * Used in ReadCommand#executeLocal to reduce the amount of filtering performed on the + * results of the index query. + * + * @param filter the intial filter belonging to a ReadCommand + * @return the (hopefully) reduced filter that would still need to be applied after + * the index was used to narrow the initial result set + */ + public RowFilter getPostIndexQueryFilter(RowFilter filter); + + /** + * Return an estimate of the number of results this index is expected to return for any given + * query that it can be used to answer. Used in conjunction with indexes() and supportsExpression() + * to determine the most selective index for a given ReadCommand. Additionally, this is also used + * by StorageProxy.estimateResultsPerRange to calculate the initial concurrency factor for range requests + * + * @return the estimated average number of results a Searcher may return for any given query + */ + public long getEstimatedResultRows(); + + /* + * Input validation + */ + + /** + * Called at write time to ensure that values present in the update + * are valid according to the rules of all registered indexes which + * will process it. The partition key as well as the clustering and + * cell values for each row in the update may be checked by index + * implementations + * @param update PartitionUpdate containing the values to be validated by registered Index implementations + * @throws InvalidRequestException + */ + public void validate(PartitionUpdate update) throws InvalidRequestException; + + /* + * Update processing + */ + + /** + * Factory method for write time event handlers. + * Callers should check the indexes method first and only get a new + * handler when the index claims an interest in the specific update + * otherwise work may be done unnecessarily + * + * @param key key of the partition being modified + * @param nowInSec current time of the update operation + * @param opGroup operation group spanning the update operation + * @param transactionType indicates what kind of update is being performed on the base data + * i.e. a write time insert/update/delete or the result of compaction + * @return + */ + public Indexer indexerFor(DecoratedKey key, + int nowInSec, + OpOrder.Group opGroup, + IndexTransaction.Type transactionType); + + /** + * Listener for processing events emitted during a single partition update. + * Instances of this are responsible for applying modifications to the index in response to a single update + * operation on a particular partition of the base table. + * That update may be generated by the normal write path, by iterating SSTables during streaming operations or when + * building or rebuilding an index from source. Updates also occur during compaction when multiple versions of a + * source partition from different SSTables are merged. + */ + public interface Indexer + { + /** + * Notification of the start of a partition update. + * This event always occurs before any other during the update. + */ + public void begin(); + + /** + * Notification of a top level partition delete. + * @param deletionTime + */ + public void partitionDelete(DeletionTime deletionTime); + + /** + * Notification of a RangeTombstone. + * An update of a single partition may contain multiple RangeTombstones, + * and a notification will be passed for each of them. + * @param tombstone + */ + public void rangeTombstone(RangeTombstone tombstone); + + /** + * Notification that a new row was inserted into the Memtable holding the partition. + * This only implies that the inserted row was not already present in the Memtable, + * it *does not* guarantee that the row does not exist in an SSTable, potentially with + * additional column data. + * + * @param row the Row being inserted into the base table's Memtable. + */ + public void insertRow(Row row); + + /** + * Notification of a modification to a row in the base table's Memtable. + * This is allow an Index implementation to clean up entries for base data which is + * never flushed to disk (and so will not be purged during compaction). + * It's important to note that the old & new rows supplied here may not represent + * the totality of the data for the Row with this particular Clustering. There may be + * additional column data in SSTables which is not present in either the old or new row, + * so implementations should be aware of that. + * The supplied rows contain only column data which has actually been updated. + * oldRowData contains only the columns which have been removed from the Row's + * representation in the Memtable, while newRowData includes only new columns + * which were not previously present. Any column data which is unchanged by + * the update is not included. + * + * @param oldRowData data that was present in existing row and which has been removed from + * the base table's Memtable + * @param newRowData data that was not present in the existing row and is being inserted + * into the base table's Memtable + */ + public void updateRow(Row oldRowData, Row newRowData); + + /** + * Notification that a row was removed from the partition. + * Note that this is only called as part of either a compaction or a cleanup. + * This context is indicated by the TransactionType supplied to the indexerFor method. + * + * As with updateRow, it cannot be guaranteed that all data belonging to the Clustering + * of the supplied Row has been removed (although in the case of a cleanup, that is the + * ultimate intention). + * There may be data for the same row in other SSTables, so in this case Indexer implementations + * should *not* assume that all traces of the row have been removed. In particular, + * it is not safe to assert that all values associated with the Row's Clustering + * have been deleted, so implementations which index primary key columns should not + * purge those entries from their indexes. + * + * @param row data being removed from the base table + */ + public void removeRow(Row row); + + /** + * Notification of the end of the partition update. + * This event always occurs after all others for the particular update. + */ + public void finish(); + } + + /* + * Querying + */ + + /** + * Return a function which performs post processing on the results of a partition range read command. + * In future, this may be used as a generalized mechanism for transforming results on the coordinator prior + * to returning them to the caller. + * + * This is used on the coordinator during execution of a range command to perform post + * processing of merged results obtained from the necessary replicas. This is the only way in which results are + * transformed in this way but this may change over time as usage is generalized. + * See CASSANDRA-8717 for further discussion. + * + * The function takes a PartitionIterator of the results from the replicas which has already been collated + * & reconciled, along with the command being executed. It returns another PartitionIterator containing the results + * of the transformation (which may be the same as the input if the transformation is a no-op). + */ + public BiFunction postProcessorFor(ReadCommand command); + + /** + * Factory method for query time search helper. + * @param command the read command being executed + * @return an Searcher with which to perform the supplied command + */ + public Searcher searcherFor(ReadCommand command); + + /** + * Performs the actual index lookup during execution of a ReadCommand. + * An instance performs its query according to the RowFilter.Expression it was created for (see searcherFor) + * An Expression is a predicate of the form [column] [operator] [value]. + */ + public interface Searcher + { + /** + * @param orderGroup the collection of OpOrder.Groups which the ReadCommand is being performed under. + * @return partitions from the base table matching the criteria of the search. + */ + public UnfilteredPartitionIterator search(ReadOrderGroup orderGroup); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/IndexRegistry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java b/src/java/org/apache/cassandra/index/IndexRegistry.java new file mode 100644 index 0000000..6a004fb --- /dev/null +++ b/src/java/org/apache/cassandra/index/IndexRegistry.java @@ -0,0 +1,22 @@ +package org.apache.cassandra.index; + +import java.util.Collection; + +import org.apache.cassandra.schema.IndexMetadata; + +/** + * The collection of all Index instances for a base table. + * The SecondaryIndexManager for a ColumnFamilyStore contains an IndexRegistry + * (actually it implements this interface at present) and Index implementations + * register in order to: + * i) subscribe to the stream of updates being applied to partitions in the base table + * ii) provide searchers to support queries with the relevant search predicates + */ +public interface IndexRegistry +{ + void registerIndex(Index index); + void unregisterIndex(Index index); + + Index getIndex(IndexMetadata indexMetadata); + Collection listIndexes(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java new file mode 100644 index 0000000..d6ae8e2 --- /dev/null +++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index; + +import java.io.IOException; +import java.util.Set; +import java.util.UUID; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.CompactionInfo; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.utils.UUIDGen; + +/** + * Manages building an entire index from column family data. Runs on to compaction manager. + */ +public class SecondaryIndexBuilder extends CompactionInfo.Holder +{ + private final ColumnFamilyStore cfs; + private final Set indexers; + private final ReducingKeyIterator iter; + private final UUID compactionId; + + public SecondaryIndexBuilder(ColumnFamilyStore cfs, Set indexers, ReducingKeyIterator iter) + { + this.cfs = cfs; + this.indexers = indexers; + this.iter = iter; + this.compactionId = UUIDGen.getTimeUUID(); + } + + public CompactionInfo getCompactionInfo() + { + return new CompactionInfo(cfs.metadata, + OperationType.INDEX_BUILD, + iter.getBytesRead(), + iter.getTotalBytes(), + compactionId); + } + + public void build() + { + try + { + while (iter.hasNext()) + { + if (isStopRequested()) + throw new CompactionInterruptedException(getCompactionInfo()); + DecoratedKey key = iter.next(); + Keyspace.indexPartition(key, cfs, indexers); + } + } + finally + { + try + { + iter.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java new file mode 100644 index 0000000..5d32b70 --- /dev/null +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -0,0 +1,900 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index; + +import java.lang.reflect.Constructor; +import java.util.*; +import java.util.concurrent.*; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.statements.IndexTarget; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.internal.CassandraIndex; +import org.apache.cassandra.index.transactions.*; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.schema.Indexes; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.concurrent.Refs; + +/** + * Handles the core maintenance functionality associated with indexes: adding/removing them to or from + * a table, (re)building during bootstrap or other streaming operations, flushing, reloading metadata + * and so on. + * + * The Index interface defines a number of methods which return Callable. These are primarily the + * management tasks for an index implementation. Most of them are currently executed in a blocking + * fashion via submission to SIM's blockingExecutor. This provides the desired behaviour in pretty + * much all cases, as tasks like flushing an index needs to be executed synchronously to avoid potentially + * deadlocking on the FlushWriter or PostFlusher. Several of these Callable returning methods on Index could + * then be defined with as void and called directly from SIM (rather than being run via the executor service). + * Separating the task defintion from execution gives us greater flexibility though, so that in future, for example, + * if the flush process allows it we leave open the possibility of executing more of these tasks asynchronously. + * + * The primary exception to the above is the Callable returned from Index#addIndexedColumn. This may + * involve a significant effort, building a new index over any existing data. We perform this task asynchronously; + * as it is called as part of a schema update, which we do not want to block for a long period. Building non-custom + * indexes is performed on the CompactionManager. + * + * This class also provides instances of processors which listen to updates to the base table and forward to + * registered Indexes the info required to keep those indexes up to date. + * There are two variants of these processors, each with a factory method provided by SIM: + * IndexTransaction: deals with updates generated on the regular write path. + * CleanupTransaction: used when partitions are modified during compaction or cleanup operations. + * Further details on their usage and lifecycles can be found in the interface definitions below. + * + * Finally, the bestIndexFor method is used at query time to identify the most selective index of those able + * to satisfy any search predicates defined by a ReadCommand's RowFilter. It returns a thin IndexAccessor object + * which enables the ReadCommand to access the appropriate functions of the Index at various stages in its lifecycle. + * e.g. the getEstimatedResultRows is required when StorageProxy calculates the initial concurrency factor for + * distributing requests to replicas, whereas a Searcher instance is needed when the ReadCommand is executed locally on + * a target replica. + */ +public class SecondaryIndexManager implements IndexRegistry +{ + private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class); + + + private Map indexes = Maps.newConcurrentMap(); + + // executes tasks returned by Indexer#addIndexColumn which may require index(es) to be (re)built + private static final ExecutorService asyncExecutor = + new JMXEnabledThreadPoolExecutor(1, + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("SecondaryIndexManagement"), + "internal"); + + // executes all blocking tasks produced by Indexers e.g. getFlushTask, getMetadataReloadTask etc + private static final ExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService(); + + /** + * The underlying column family containing the source data for these indexes + */ + public final ColumnFamilyStore baseCfs; + + public SecondaryIndexManager(ColumnFamilyStore baseCfs) + { + this.baseCfs = baseCfs; + } + + + /** + * Drops and adds new indexes associated with the underlying CF + */ + public void reload() + { + // figure out what needs to be added and dropped. + Indexes tableIndexes = baseCfs.metadata.getIndexes(); + indexes.keySet() + .stream() + .filter(indexName -> !tableIndexes.has(indexName)) + .forEach(this::removeIndex); + + // we call add for every index definition in the collection as + // some may not have been created here yet, only added to schema + for (IndexMetadata tableIndex : tableIndexes) + addIndex(tableIndex); + } + + private Future reloadIndex(IndexMetadata indexDef) + { + // if the index metadata has changed, reload the index + IndexMetadata registered = indexes.get(indexDef.name).getIndexMetadata(); + if (!registered.equals(indexDef)) + { + Index index = indexes.remove(registered.name); + index.register(this); + return blockingExecutor.submit(index.getMetadataReloadTask(indexDef)); + } + + // otherwise, nothing to do + return Futures.immediateFuture(null); + } + + private Future createIndex(IndexMetadata indexDef) + { + Index index = createInstance(indexDef); + index.register(this); + final Callable initialBuildTask = index.getInitializationTask(); + return initialBuildTask == null + ? Futures.immediateFuture(null) + : asyncExecutor.submit(initialBuildTask); + } + + /** + * Adds and builds a index + * @param indexDef the IndexMetadata describing the index + */ + public synchronized Future addIndex(IndexMetadata indexDef) + { + if (indexes.containsKey(indexDef.name)) + return reloadIndex(indexDef); + else + return createIndex(indexDef); + } + + public synchronized void removeIndex(String indexName) + { + Index index = indexes.remove(indexName); + if (null != index) + { + executeBlocking(index.getInvalidateTask()); + unregisterIndex(index); + } + } + + /** + * Called when dropping a Table + */ + public void markAllIndexesRemoved() + { + getBuiltIndexNames().forEach(this::markIndexRemoved); + } + + /** + * Does a full, blocking rebuild of the indexes specified by columns from the sstables. + * Caller must acquire and release references to the sstables used here. + * Note also that only this method of (re)building indexes: + * a) takes a set of index *names* rather than Indexers + * b) marks exsiting indexes removed prior to rebuilding + * + * @param sstables the data to build from + * @param indexNames the list of indexes to be rebuilt + */ + public void rebuildIndexesBlocking(Collection sstables, Set indexNames) + { + Set toRebuild = indexes.values().stream() + .filter(indexer -> indexNames.contains(indexer.getIndexName())) + .collect(Collectors.toSet()); + if (toRebuild.isEmpty()) + { + logger.info("No defined indexes with the supplied names"); + return; + } + + toRebuild.forEach(indexer -> markIndexRemoved(indexer.getIndexName())); + + buildIndexesBlocking(sstables, toRebuild); + + toRebuild.forEach(indexer -> markIndexBuilt(indexer.getIndexName())); + } + + public void buildAllIndexesBlocking(Collection sstables) + { + buildIndexesBlocking(sstables, ImmutableSet.copyOf(indexes.values())); + } + + // For convenience, may be called directly from Index impls + public void buildIndexBlocking(Index index) + { + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)); + Refs sstables = viewFragment.refs) + { + buildIndexesBlocking(sstables, Collections.singleton(index)); + markIndexBuilt(index.getIndexName()); + } + } + + private void buildIndexesBlocking(Collection sstables, Set indexes) + { + if (indexes.isEmpty()) + return; + + logger.info("Submitting index build of {} for data in {}", + indexes.stream().map(Index::getIndexName).collect(Collectors.joining(",")), + sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(","))); + + SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, + indexes, + new ReducingKeyIterator(sstables)); + Future future = CompactionManager.instance.submitIndexBuild(builder); + FBUtilities.waitOnFuture(future); + + flushIndexesBlocking(indexes); + logger.info("Index build of {} complete", + indexes.stream().map(Index::getIndexName).collect(Collectors.joining(","))); + } + + private void markIndexBuilt(String indexName) + { + SystemKeyspace.setIndexBuilt(baseCfs.name, indexName); + } + + private void markIndexRemoved(String indexName) + { + SystemKeyspace.setIndexRemoved(baseCfs.name, indexName); + } + + + public Index getIndexByName(String indexName) + { + return indexes.get(indexName); + } + + private Index createInstance(IndexMetadata indexDef) + { + Index newIndex; + if (indexDef.isCustom()) + { + assert indexDef.options != null; + String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME); + assert ! Strings.isNullOrEmpty(className); + try + { + Class indexClass = FBUtilities.classForName(className, "Index"); + Constructor ctor = indexClass.getConstructor(ColumnFamilyStore.class, IndexMetadata.class); + newIndex = (Index)ctor.newInstance(baseCfs, indexDef); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + else + { + newIndex = CassandraIndex.newIndex(baseCfs, indexDef); + } + return newIndex; + } + + /** + * Truncate all indexes + */ + public void truncateAllIndexesBlocking(final long truncatedAt) + { + executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt)); + } + + /** + * Remove all indexes + */ + public void invalidateAllIndexesBlocking() + { + executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask); + } + + /** + * Perform a blocking flush all indexes + */ + public void flushAllIndexesBlocking() + { + flushIndexesBlocking(ImmutableSet.copyOf(indexes.values())); + } + + /** + * Perform a blocking flush of selected indexes + */ + public void flushIndexesBlocking(Set indexes) + { + if (indexes.isEmpty()) + return; + + List> wait = new ArrayList<>(); + List nonCfsIndexes = new ArrayList<>(); + + // for each CFS backed index, submit a flush task which we'll wait on for completion + // for the non-CFS backed indexes, we'll flush those while we wait. + synchronized (baseCfs.getTracker()) + { + indexes.forEach(index -> + index.getBackingTable() + .map(cfs -> wait.add(cfs.forceFlush())) + .orElse(nonCfsIndexes.add(index))); + } + executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask); + FBUtilities.waitOnFutures(wait); + } + + /** + * Performs a blocking flush of all custom indexes + */ + public void flushAllCustomIndexesBlocking() + { + Set customIndexers = indexes.values().stream() + .filter(index -> !(index instanceof CassandraIndex)) + .collect(Collectors.toSet()); + flushIndexesBlocking(customIndexers); + } + + /** + * @return all indexes which are marked as built and ready to use + */ + public List getBuiltIndexNames() + { + Set allIndexNames = new HashSet<>(); + indexes.values().stream() + .map(Index::getIndexName) + .forEach(allIndexNames::add); + return SystemKeyspace.getBuiltIndexes(baseCfs.keyspace.getName(), allIndexNames); + } + + /** + * @return all backing Tables used by registered indexes + */ + public Set getAllIndexColumnFamilyStores() + { + Set backingTables = new HashSet<>(); + indexes.values().forEach(index -> index.getBackingTable().ifPresent(backingTables::add)); + return backingTables; + } + + /** + * @return if there are ANY indexes registered for this table + */ + public boolean hasIndexes() + { + return !indexes.isEmpty(); + } + + /** + * When building an index against existing data in sstables, add the given partition to the index + */ + public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set indexes, int nowInSec) + { + if (!indexes.isEmpty()) + { + DecoratedKey key = partition.partitionKey(); + Set indexers = indexes.stream() + .map(index -> index.indexerFor(key, + nowInSec, + opGroup, + IndexTransaction.Type.UPDATE)) + .collect(Collectors.toSet()); + + indexers.forEach(Index.Indexer::begin); + + try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec)) + { + if (!filtered.staticRow().isEmpty()) + indexers.forEach(indexer -> indexer.insertRow(filtered.staticRow())); + + while (filtered.hasNext()) + { + Row row = filtered.next(); + indexers.forEach(indexer -> indexer.insertRow(row)); + } + } + + indexers.forEach(Index.Indexer::finish); + } + } + + /** + * Delete all data from all indexes for this partition. + * For when cleanup rips a partition out entirely. + * + * TODO : improve cleanup transaction to batch updates & perform them async + */ + public void deletePartition(UnfilteredRowIterator partition, int nowInSec) + { + // we need to acquire memtable lock because secondary index deletion may + // cause a race (see CASSANDRA-3712). This is done internally by the + // index transaction when it commits + CleanupTransaction indexTransaction = newCleanupTransaction(partition.partitionKey(), + partition.columns(), + nowInSec); + indexTransaction.start(); + indexTransaction.onPartitionDeletion(partition.partitionLevelDeletion()); + indexTransaction.commit(); + + while (partition.hasNext()) + { + Unfiltered unfiltered = partition.next(); + if (unfiltered.kind() != Unfiltered.Kind.ROW) + continue; + + indexTransaction = newCleanupTransaction(partition.partitionKey(), + partition.columns(), + nowInSec); + indexTransaction.start(); + indexTransaction.onRowDelete((Row)unfiltered); + indexTransaction.commit(); + } + } + + /** + * Called at query time to find the most selective of the registered index implementation + * (i.e. the one likely to return the fewest results) from those registered. + * Implementation specific validation of the target expression by the most selective + * index should be performed in the searcherFor method to ensure that we pick the right + * index regardless of the validity of the expression. + * + * This method is called at various points during the lifecycle of a ReadCommand (to obtain a Searcher, + * get the index's underlying CFS for ReadOrderGroup, or an estimate of the result size from an average index + * query). + * + * Ideally, we would do this relatively expensive operation only once, and attach the index to the + * ReadCommand for future reference. This requires the index be passed onto additional commands generated + * to process subranges etc. + * + * @param command ReadCommand to be executed + * @return an Index instance, ready to use during execution of the command, or null if none + * of the registered indexes can support the command. + */ + public Index getBestIndexFor(ReadCommand command, boolean includeInTrace) + { + if (indexes.isEmpty() || command.rowFilter().isEmpty()) + return null; + + Set searchableIndexes = new HashSet<>(); + for (RowFilter.Expression expression : command.rowFilter()) + { + indexes.values().stream() + .filter(index -> index.supportsExpression(expression.column(), expression.operator())) + .forEach(searchableIndexes::add); + } + + if (searchableIndexes.isEmpty()) + { + logger.debug("No applicable indexes found"); + if (includeInTrace) + Tracing.trace("No applicable indexes found"); + return null; + } + + Index selected = searchableIndexes.stream() + .max((a, b) -> Longs.compare(a.getEstimatedResultRows(), + b.getEstimatedResultRows())) + .orElseThrow(() -> new AssertionError("Could not select most selective index")); + + // pay for an additional threadlocal get() rather than build the strings unnecessarily + if (includeInTrace && Tracing.isTracing()) + { + Tracing.trace("Index mean cardinalities are {}. Scanning with {}.", + searchableIndexes.stream().map(i -> i.getIndexName() + ':' + i.getEstimatedResultRows()) + .collect(Collectors.joining(",")), + selected.getIndexName()); + } + return selected; + } + + // convenience method which doesn't emit tracing messages + public Index getBestIndexFor(ReadCommand command) + { + return getBestIndexFor(command, false); + } + + /** + * Called at write time to ensure that values present in the update + * are valid according to the rules of all registered indexes which + * will process it. The partition key as well as the clustering and + * cell values for each row in the update may be checked by index + * implementations + * @param update PartitionUpdate containing the values to be validated by registered Index implementations + * @throws InvalidRequestException + */ + public void validate(PartitionUpdate update) throws InvalidRequestException + { + indexes.values() + .stream() + .filter(i -> i.indexes(update.columns())) + .forEach(i -> i.validate(update)); + } + + /** + * IndexRegistry methods + */ + public void registerIndex(Index index) + { + indexes.put(index.getIndexMetadata().name, index); + logger.debug("Registered index {}", index.getIndexMetadata().name); + } + + public void unregisterIndex(Index index) + { + Index removed = indexes.remove(index.getIndexMetadata().name); + logger.debug(removed == null ? "Index {} was not registered" : "Removed index {} from registry", + index.getIndexMetadata().name); + } + + public Index getIndex(IndexMetadata metadata) + { + return indexes.get(metadata.name); + } + + public Collection listIndexes() + { + return ImmutableSet.copyOf(indexes.values()); + } + + /** + * Handling of index updates. + * Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data + * during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances. + */ + + /** + * Transaction for updates on the write path. + */ + public UpdateTransaction newUpdateTransaction(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec) + { + if (!hasIndexes()) + return UpdateTransaction.NO_OP; + + // todo : optimize lookup, we can probably cache quite a bit of stuff, rather than doing + // a linear scan every time. Holding off that though until CASSANDRA-7771 to figure out + // exactly how indexes are to be identified & associated with a given partition update + Index.Indexer[] indexers = indexes.values().stream() + .filter(i -> i.indexes(update.columns())) + .map(i -> i.indexerFor(update.partitionKey(), + nowInSec, + opGroup, + IndexTransaction.Type.UPDATE)) + .toArray(Index.Indexer[]::new); + + return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers); + } + + /** + * Transaction for use when merging rows during compaction + */ + public CompactionTransaction newCompactionTransaction(DecoratedKey key, + PartitionColumns partitionColumns, + int versions, + int nowInSec) + { + // the check for whether there are any registered indexes is already done in CompactionIterator + + Index[] interestedIndexes = indexes.values().stream() + .filter(i -> i.indexes(partitionColumns)) + .toArray(Index[]::new); + + return interestedIndexes.length == 0 + ? CompactionTransaction.NO_OP + : new IndexGCTransaction(key, versions, nowInSec, interestedIndexes); + } + + /** + * Transaction for use when removing partitions during cleanup + */ + public CleanupTransaction newCleanupTransaction(DecoratedKey key, + PartitionColumns partitionColumns, + int nowInSec) + { + // + if (!hasIndexes()) + return CleanupTransaction.NO_OP; + + Index[] interestedIndexes = indexes.values().stream() + .filter(i -> i.indexes(partitionColumns)) + .toArray(Index[]::new); + + return interestedIndexes.length == 0 + ? CleanupTransaction.NO_OP + : new CleanupGCTransaction(key, nowInSec, interestedIndexes); + } + + /** + * A single use transaction for processing a partition update on the regular write path + */ + private static final class WriteTimeTransaction implements UpdateTransaction + { + private final Index.Indexer[] indexers; + + private WriteTimeTransaction(Index.Indexer...indexers) + { + // don't allow null indexers, if we don't need any use a NullUpdater object + for (Index.Indexer indexer : indexers) assert indexer != null; + this.indexers = indexers; + } + + public void start() + { + Arrays.stream(indexers).forEach(Index.Indexer::begin); + } + + public void onPartitionDeletion(DeletionTime deletionTime) + { + Arrays.stream(indexers).forEach(h -> h.partitionDelete(deletionTime)); + } + + public void onRangeTombstone(RangeTombstone tombstone) + { + Arrays.stream(indexers) .forEach(h -> h.rangeTombstone(tombstone)); + } + + public void onInserted(Row row) + { + Arrays.stream(indexers).forEach(h -> h.insertRow(row)); + } + + public void onUpdated(Row existing, Row updated) + { + final Row.Builder toRemove = BTreeRow.sortedBuilder(existing.columns()); + toRemove.newRow(existing.clustering()); + final Row.Builder toInsert = BTreeRow.sortedBuilder(updated.columns()); + toInsert.newRow(updated.clustering()); + // diff listener collates the columns to be added & removed from the indexes + RowDiffListener diffListener = new RowDiffListener() + { + public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) + { + if (merged != null && merged != original) + toInsert.addPrimaryKeyLivenessInfo(merged); + } + + public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original) + { + } + + public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original) + { + } + + public void onCell(int i, Clustering clustering, Cell merged, Cell original) + { + if (merged != null && merged != original) + toInsert.addCell(merged); + + if (merged == null || (original != null && shouldCleanupOldValue(original, merged))) + toRemove.addCell(original); + + } + }; + Rows.diff(diffListener, updated, updated.columns().mergeTo(existing.columns()), existing); + Row oldRow = toRemove.build(); + Row newRow = toInsert.build(); + Arrays.stream(indexers).forEach(i -> i.updateRow(oldRow, newRow)); + } + + public void commit() + { + Arrays.stream(indexers).forEach(Index.Indexer::finish); + } + + private boolean shouldCleanupOldValue(Cell oldCell, Cell newCell) + { + // If either the value or timestamp is different, then we + // should delete from the index. If not, then we can infer that + // at least one of the cells is an ExpiringColumn and that the + // difference is in the expiry time. In this case, we don't want to + // delete the old value from the index as the tombstone we insert + // will just hide the inserted value. + // Completely identical cells (including expiring columns with + // identical ttl & localExpirationTime) will not get this far due + // to the oldCell.equals(newCell) in StandardUpdater.update + return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp(); + } + } + + /** + * A single-use transaction for updating indexes for a single partition during compaction where the only + * operation is to merge rows + * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in + * a single partition + */ + private static final class IndexGCTransaction implements CompactionTransaction + { + private final DecoratedKey key; + private final int versions; + private final int nowInSec; + private final Index[] indexes; + + private Row[] rows; + + private IndexGCTransaction(DecoratedKey key, + int versions, + int nowInSec, + Index...indexes) + { + // don't allow null indexers, if we don't have any, use a noop transaction + for (Index index : indexes) assert index != null; + + this.key = key; + this.versions = versions; + this.indexes = indexes; + this.nowInSec = nowInSec; + } + + public void start() + { + if (versions > 0) + rows = new Row[versions]; + } + + public void onRowMerge(Columns columns, Row merged, Row...versions) + { + // Diff listener constructs rows representing deltas between the merged and original versions + // These delta rows are then passed to registered indexes for removal processing + final Row.Builder[] builders = new Row.Builder[versions.length]; + RowDiffListener diffListener = new RowDiffListener() + { + public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) + { + } + + public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original) + { + } + + public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original) + { + } + + public void onCell(int i, Clustering clustering, Cell merged, Cell original) + { + if (original != null && merged == null) + { + if (builders[i] == null) + { + builders[i] = BTreeRow.sortedBuilder(columns); + builders[i].newRow(clustering); + } + builders[i].addCell(original); + } + } + }; + + Rows.diff(diffListener, merged, columns, versions); + + for(int i = 0; i < builders.length; i++) + if (builders[i] != null) + rows[i] = builders[i].build(); + } + + public void commit() + { + if (rows == null) + return; + + try (OpOrder.Group opGroup = Keyspace.writeOrder.start()) + { + Index.Indexer[] indexers = Arrays.stream(indexes) + .map(i -> i.indexerFor(key, nowInSec, opGroup, Type.COMPACTION)) + .toArray(Index.Indexer[]::new); + + Arrays.stream(indexers).forEach(Index.Indexer::begin); + + for (Row row : rows) + if (row != null) + Arrays.stream(indexers).forEach(indexer -> indexer.removeRow(row)); + + Arrays.stream(indexers).forEach(Index.Indexer::finish); + } + } + } + + /** + * A single-use transaction for updating indexes for a single partition during cleanup, where + * partitions and rows are only removed + * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in + * a single partition + */ + private static final class CleanupGCTransaction implements CleanupTransaction + { + private final DecoratedKey key; + private final int nowInSec; + private final Index[] indexes; + + private Row row; + private DeletionTime partitionDelete; + + private CleanupGCTransaction(DecoratedKey key, + int nowInSec, + Index...indexes) + { + // don't allow null indexers, if we don't have any, use a noop transaction + for (Index index : indexes) assert index != null; + + this.key = key; + this.indexes = indexes; + this.nowInSec = nowInSec; + } + + public void start() + { + } + + public void onPartitionDeletion(DeletionTime deletionTime) + { + partitionDelete = deletionTime; + } + + public void onRowDelete(Row row) + { + this.row = row; + } + + public void commit() + { + if (row == null && partitionDelete == null) + return; + + try (OpOrder.Group opGroup = Keyspace.writeOrder.start()) + { + Index.Indexer[] indexers = Arrays.stream(indexes) + .map(i -> i.indexerFor(key, nowInSec, opGroup, Type.CLEANUP)) + .toArray(Index.Indexer[]::new); + + Arrays.stream(indexers).forEach(Index.Indexer::begin); + + if (partitionDelete != null) + Arrays.stream(indexers).forEach(indexer -> indexer.partitionDelete(partitionDelete)); + + if (row != null) + Arrays.stream(indexers).forEach(indexer -> indexer.removeRow(row)); + + Arrays.stream(indexers).forEach(Index.Indexer::finish); + } + } + } + + private static void executeBlocking(Callable task) + { + if (null != task) + FBUtilities.waitOnFuture(blockingExecutor.submit(task)); + } + + private static void executeAllBlocking(Stream indexers, Function> function) + { + List> waitFor = new ArrayList<>(); + indexers.forEach(indexer -> { + Callable task = function.apply(indexer); + if (null != task) + waitFor.add(blockingExecutor.submit(task)); + }); + FBUtilities.waitOnFutures(waitFor); + } +}