Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 889FD200C5D for ; Fri, 7 Apr 2017 11:49:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8739A160B93; Fri, 7 Apr 2017 09:49:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ACC68160BA8 for ; Fri, 7 Apr 2017 11:49:40 +0200 (CEST) Received: (qmail 16479 invoked by uid 500); 7 Apr 2017 09:49:39 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 16200 invoked by uid 99); 7 Apr 2017 09:49:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Apr 2017 09:49:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 36410E152F; Fri, 7 Apr 2017 09:49:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: blerer@apache.org To: commits@cassandra.apache.org Date: Fri, 07 Apr 2017 09:49:44 -0000 Message-Id: <43b0ec5d4b5a43ba844f5883b0343d8c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [6/6] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0 archived-at: Fri, 07 Apr 2017 09:49:42 -0000 Merge branch cassandra-2.2 into cassandra-3.0 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/833c993b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/833c993b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/833c993b Branch: refs/heads/cassandra-3.0 Commit: 833c993b8e604046179067e663f963dcf4c4a2ca Parents: 8d34076 194329d Author: Benjamin Lerer Authored: Fri Apr 7 11:34:10 2017 +0200 Committer: Benjamin Lerer Committed: Fri Apr 7 11:39:41 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../restrictions/StatementRestrictions.java | 17 +++++++++++++++ .../cql3/statements/SelectStatement.java | 9 ++++---- .../index/internal/CassandraIndex.java | 2 +- .../validation/entities/SecondaryIndexTest.java | 23 ++++++++++++++++++++ 5 files changed, 47 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/833c993b/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 5bcdf16,6ea2d59..33d5028 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -22,22 -4,8 +22,23 @@@ Merged from 2.2 * Discard in-flight shadow round responses (CASSANDRA-12653) * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153) * Wrong logger name in AnticompactionTask (CASSANDRA-13343) + * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282) * Fix queries updating multiple time the same list (CASSANDRA-13130) * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053) +Merged from 2.1: ++ * Fix 2ndary index queries on partition keys for tables with static columns CASSANDRA-13147 + * Fix ParseError unhashable type list in cqlsh copy from (CASSANDRA-13364) + +3.0.12 + * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294) + * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233) + * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071) + * Update c.yaml doc for offheap memtables (CASSANDRA-13179) + * Faster StreamingHistogram (CASSANDRA-13038) + * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237) + * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070) + * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185) +Merged from 2.2: * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886) * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232) * Coalescing strategy sleeps too much (CASSANDRA-13090) http://git-wip-us.apache.org/repos/asf/cassandra/blob/833c993b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java index 542dec9,2c396c4..d025d8a --- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java @@@ -78,12 -71,16 +78,18 @@@ public final class StatementRestriction */ private RestrictionSet nonPrimaryKeyRestrictions; + /** + * true if nonPrimaryKeyRestrictions contains restriction on a regular column, + * false otherwise. + */ + private boolean hasRegularColumnsRestriction = false; + + private Set notNullColumns; + /** - * The restrictions used to build the index expressions + * The restrictions used to build the row filter */ - private final List indexRestrictions = new ArrayList<>(); + private final IndexRestrictions indexRestrictions = new IndexRestrictions(); /** * true if the secondary index need to be queried, false otherwise http://git-wip-us.apache.org/repos/asf/cassandra/blob/833c993b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index aca6146,13276c7..bd377f4 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@@ -730,41 -703,35 +730,42 @@@ public class SelectStatement implement } else { - keyComponents = new ByteBuffer[]{ key }; + return new ByteBuffer[]{ key }; } + } - Iterator cells = cf.getSortedColumns().iterator(); - if (restrictions.isNonCompositeSliceWithExclusiveBounds()) - cells = applySliceRestriction(cells, options); - + // Used by ModificationStatement for CAS operations + void processPartition(RowIterator partition, QueryOptions options, Selection.ResultSetBuilder result, int nowInSec) + throws InvalidRequestException + { int protocolVersion = options.getProtocolVersion(); - CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(cells); - // If there is static columns but there is no non-static row, - // and the select was a full partition selection (i.e. there was no condition on clustering or regular columns), - // we want to include the static columns in the result set (and we're done). - CQL3Row staticRow = iter.getStaticRow(); - if (staticRow != null && !iter.hasNext() && !restrictions.hasClusteringColumnsRestriction() && !restrictions.hasRegularColumnsRestriction()) + ByteBuffer[] keyComponents = getComponents(cfm, partition.partitionKey()); + + Row staticRow = partition.staticRow(); - // If there is no rows, then provided the select was a full partition selection - // (i.e. not a 2ndary index search and there was no condition on clustering columns), ++ // If there is no rows, and there's no restriction on clustering/regular columns, ++ // then provided the select was a full partition selection (either by partition key and/or by static column), + // we want to include static columns and we're done. + if (!partition.hasNext()) { - if (!staticRow.isEmpty() && (!restrictions.usesSecondaryIndexing() || cfm.isStaticCompactTable()) - && !restrictions.hasClusteringColumnsRestriction()) - result.newRow(protocolVersion); - for (ColumnDefinition def : selection.getColumns()) ++ if (!staticRow.isEmpty() ++ && (!restrictions.hasClusteringColumnsRestriction() || cfm.isStaticCompactTable()) ++ && !restrictions.hasRegularColumnsRestriction()) { - switch (def.kind) + result.newRow(protocolVersion); + for (ColumnDefinition def : selection.getColumns()) { - case PARTITION_KEY: - result.add(keyComponents[def.position()]); - break; - case STATIC: - addValue(result, def, staticRow, options); - break; - default: - result.add((ByteBuffer)null); + switch (def.kind) + { + case PARTITION_KEY: + result.add(keyComponents[def.position()]); + break; + case STATIC: + addValue(result, def, staticRow, nowInSec, protocolVersion); + break; + default: + result.add((ByteBuffer)null); + } } } return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/833c993b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/internal/CassandraIndex.java index 2a0dec0,0000000..5caeefa mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@@ -1,882 -1,0 +1,882 @@@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.cassandra.index.internal; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.function.BiFunction; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import com.google.common.collect.ImmutableSet; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.cql3.statements.IndexTarget; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.db.marshal.EmptyType; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.index.IndexRegistry; +import org.apache.cassandra.index.SecondaryIndexBuilder; +import org.apache.cassandra.index.internal.composites.CompositesSearcher; +import org.apache.cassandra.index.internal.keys.KeysSearcher; +import org.apache.cassandra.index.transactions.IndexTransaction; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.concurrent.Refs; + +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse; + +/** + * Index implementation which indexes the values for a single column in the base + * table and which stores its index data in a local, hidden table. + */ +public abstract class CassandraIndex implements Index +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class); + + public static final Pattern TARGET_REGEX = Pattern.compile("^(keys|entries|values|full)\\((.+)\\)$"); + + public final ColumnFamilyStore baseCfs; + protected IndexMetadata metadata; + protected ColumnFamilyStore indexCfs; + protected ColumnDefinition indexedColumn; + protected CassandraIndexFunctions functions; + + protected CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef) + { + this.baseCfs = baseCfs; + setMetadata(indexDef); + } + + /** + * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value] + * @param indexedColumn + * @param operator + * @return + */ + protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator) + { + return operator == Operator.EQ; + } + + /** + * Used to construct an the clustering for an entry in the index table based on values from the base data. + * The clustering columns in the index table encode the values required to retrieve the correct data from the base + * table and varies depending on the kind of the indexed column. See indexCfsMetadata for more details + * Used whenever a row in the index table is written or deleted. + * @param partitionKey from the base data being indexed + * @param prefix from the base data being indexed + * @param path from the base data being indexed + * @return a clustering prefix to be used to insert into the index table + */ + protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey, + ClusteringPrefix prefix, + CellPath path); + + /** + * Used at search time to convert a row in the index table into a simple struct containing the values required + * to retrieve the corresponding row from the base table. + * @param indexedValue the partition key of the indexed table (i.e. the value that was indexed) + * @param indexEntry a row from the index table + * @return + */ + public abstract IndexEntry decodeEntry(DecoratedKey indexedValue, + Row indexEntry); + + /** + * Check whether a value retrieved from an index is still valid by comparing it to current row from the base table. + * Used at read time to identify out of date index entries so that they can be excluded from search results and + * repaired + * @param row the current row from the primary data table + * @param indexValue the value we retrieved from the index + * @param nowInSec + * @return true if the index is out of date and the entry should be dropped + */ + public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec); + + /** + * Extract the value to be inserted into the index from the components of the base data + * @param partitionKey from the primary data + * @param clustering from the primary data + * @param path from the primary data + * @param cellValue from the primary data + * @return a ByteBuffer containing the value to be inserted in the index. This will be used to make the partition + * key in the index table + */ + protected abstract ByteBuffer getIndexedValue(ByteBuffer partitionKey, + Clustering clustering, + CellPath path, + ByteBuffer cellValue); + + public ColumnDefinition getIndexedColumn() + { + return indexedColumn; + } + + public ClusteringComparator getIndexComparator() + { + return indexCfs.metadata.comparator; + } + + public ColumnFamilyStore getIndexCfs() + { + return indexCfs; + } + + public void register(IndexRegistry registry) + { + registry.registerIndex(this); + } + + public Callable getInitializationTask() + { + // if we're just linking in the index on an already-built index post-restart or if the base + // table is empty we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder + return isBuilt() || baseCfs.isEmpty() ? null : getBuildIndexTask(); + } + + public IndexMetadata getIndexMetadata() + { + return metadata; + } + + public Optional getBackingTable() + { + return indexCfs == null ? Optional.empty() : Optional.of(indexCfs); + } + + public Callable getBlockingFlushTask() + { + return () -> { + indexCfs.forceBlockingFlush(); + return null; + }; + } + + public Callable getInvalidateTask() + { + return () -> { + invalidate(); + return null; + }; + } + + public Callable getMetadataReloadTask(IndexMetadata indexDef) + { + return () -> { + indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata); + indexCfs.reload(); + return null; + }; + } + + @Override + public void validate(ReadCommand command) throws InvalidRequestException + { + Optional target = getTargetExpression(command.rowFilter().getExpressions()); + + if (target.isPresent()) + { + ByteBuffer indexValue = target.get().getIndexValue(); + checkFalse(indexValue.remaining() > FBUtilities.MAX_UNSIGNED_SHORT, + "Index expression values may not be larger than 64K"); + } + } + + private void setMetadata(IndexMetadata indexDef) + { + metadata = indexDef; + Pair target = parseTarget(baseCfs.metadata, indexDef); + functions = getFunctions(indexDef, target); + CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef); + indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace, + cfm.cfName, + cfm, + baseCfs.getTracker().loadsstables); + indexedColumn = target.left; + } + + public Callable getTruncateTask(final long truncatedAt) + { + return () -> { + indexCfs.discardSSTables(truncatedAt); + return null; + }; + } + + public boolean shouldBuildBlocking() + { + // built-in indexes are always included in builds initiated from SecondaryIndexManager + return true; + } + + public boolean dependsOn(ColumnDefinition column) + { + return indexedColumn.name.equals(column.name); + } + + public boolean supportsExpression(ColumnDefinition column, Operator operator) + { + return indexedColumn.name.equals(column.name) + && supportsOperator(indexedColumn, operator); + } + + private boolean supportsExpression(RowFilter.Expression expression) + { + return supportsExpression(expression.column(), expression.operator()); + } + + public AbstractType customExpressionValueType() + { + return null; + } + + public long getEstimatedResultRows() + { + return indexCfs.getMeanColumns(); + } + + /** + * No post processing of query results, just return them unchanged + */ + public BiFunction postProcessorFor(ReadCommand command) + { + return (partitionIterator, readCommand) -> partitionIterator; + } + + public RowFilter getPostIndexQueryFilter(RowFilter filter) + { + return getTargetExpression(filter.getExpressions()).map(filter::without) + .orElse(filter); + } + + private Optional getTargetExpression(List expressions) + { + return expressions.stream().filter(this::supportsExpression).findFirst(); + } + + public Index.Searcher searcherFor(ReadCommand command) + { + Optional target = getTargetExpression(command.rowFilter().getExpressions()); + + if (target.isPresent()) + { + target.get().validateForIndexing(); + switch (getIndexMetadata().kind) + { + case COMPOSITES: + return new CompositesSearcher(command, target.get(), this); + case KEYS: + return new KeysSearcher(command, target.get(), this); + default: + throw new IllegalStateException(String.format("Unsupported index type %s for index %s on %s", + metadata.kind, + metadata.name, + indexedColumn.name.toString())); + } + } + + return null; + + } + + public void validate(PartitionUpdate update) throws InvalidRequestException + { + switch (indexedColumn.kind) + { + case PARTITION_KEY: + validatePartitionKey(update.partitionKey()); + break; + case CLUSTERING: + validateClusterings(update); + break; + case REGULAR: + if (update.columns().regulars.contains(indexedColumn)) + validateRows(update); + break; + case STATIC: + if (update.columns().statics.contains(indexedColumn)) + validateRows(Collections.singleton(update.staticRow())); + break; + } + } + + public Indexer indexerFor(final DecoratedKey key, + final PartitionColumns columns, + final int nowInSec, + final OpOrder.Group opGroup, + final IndexTransaction.Type transactionType) + { + /** + * Indexes on regular and static columns (the non primary-key ones) only care about updates with live + * data for the column they index. In particular, they don't care about having just row or range deletions + * as they don't know how to update the index table unless they know exactly the value that is deleted. + * + * Note that in practice this means that those indexes are only purged of stale entries on compaction, + * when we resolve both the deletion and the prior data it deletes. Of course, such stale entries are also + * filtered on read. + */ + if (!isPrimaryKeyIndex() && !columns.contains(indexedColumn)) + return null; + + return new Indexer() + { + public void begin() + { + } + + public void partitionDelete(DeletionTime deletionTime) + { + } + + public void rangeTombstone(RangeTombstone tombstone) + { + } + + public void insertRow(Row row) + { - if (row.isStatic() != indexedColumn.isStatic()) ++ if (row.isStatic() && !indexedColumn.isStatic() && !indexedColumn.isPartitionKey()) + return; + + if (isPrimaryKeyIndex()) + { + indexPrimaryKey(row.clustering(), + getPrimaryKeyIndexLiveness(row), + row.deletion()); + } + else + { + if (indexedColumn.isComplex()) + indexCells(row.clustering(), row.getComplexColumnData(indexedColumn)); + else + indexCell(row.clustering(), row.getCell(indexedColumn)); + } + } + + public void removeRow(Row row) + { + if (isPrimaryKeyIndex()) + return; + + if (indexedColumn.isComplex()) + removeCells(row.clustering(), row.getComplexColumnData(indexedColumn)); + else + removeCell(row.clustering(), row.getCell(indexedColumn)); + } + + public void updateRow(Row oldRow, Row newRow) + { + assert oldRow.isStatic() == newRow.isStatic(); + if (newRow.isStatic() != indexedColumn.isStatic()) + return; + + if (isPrimaryKeyIndex()) + indexPrimaryKey(newRow.clustering(), + newRow.primaryKeyLivenessInfo(), + newRow.deletion()); + + if (indexedColumn.isComplex()) + { + indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn)); + removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn)); + } + else + { + indexCell(newRow.clustering(), newRow.getCell(indexedColumn)); + removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn)); + } + } + + public void finish() + { + } + + private void indexCells(Clustering clustering, Iterable cells) + { + if (cells == null) + return; + + for (Cell cell : cells) + indexCell(clustering, cell); + } + + private void indexCell(Clustering clustering, Cell cell) + { + if (cell == null || !cell.isLive(nowInSec)) + return; + + insert(key.getKey(), + clustering, + cell, + LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()), + opGroup); + } + + private void removeCells(Clustering clustering, Iterable cells) + { + if (cells == null) + return; + + for (Cell cell : cells) + removeCell(clustering, cell); + } + + private void removeCell(Clustering clustering, Cell cell) + { + if (cell == null || !cell.isLive(nowInSec)) + return; + + delete(key.getKey(), clustering, cell, opGroup, nowInSec); + } + + private void indexPrimaryKey(final Clustering clustering, + final LivenessInfo liveness, + final Row.Deletion deletion) + { + if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP) + insert(key.getKey(), clustering, null, liveness, opGroup); + + if (!deletion.isLive()) + delete(key.getKey(), clustering, deletion.time(), opGroup); + } + + private LivenessInfo getPrimaryKeyIndexLiveness(Row row) + { + long timestamp = row.primaryKeyLivenessInfo().timestamp(); + int ttl = row.primaryKeyLivenessInfo().ttl(); + for (Cell cell : row.cells()) + { + long cellTimestamp = cell.timestamp(); + if (cell.isLive(nowInSec)) + { + if (cellTimestamp > timestamp) + { + timestamp = cellTimestamp; + ttl = cell.ttl(); + } + } + } + return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec); + } + }; + } + + /** + * Specific to internal indexes, this is called by a + * searcher when it encounters a stale entry in the index + * @param indexKey the partition key in the index table + * @param indexClustering the clustering in the index table + * @param deletion deletion timestamp etc + * @param opGroup the operation under which to perform the deletion + */ + public void deleteStaleEntry(DecoratedKey indexKey, + Clustering indexClustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + doDelete(indexKey, indexClustering, deletion, opGroup); + logger.trace("Removed index entry for stale value {}", indexKey); + } + + /** + * Called when adding a new entry to the index + */ + private void insert(ByteBuffer rowKey, + Clustering clustering, + Cell cell, + LivenessInfo info, + OpOrder.Group opGroup) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + cell)); + Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info); + PartitionUpdate upd = partitionUpdate(valueKey, row); + indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null); + logger.trace("Inserted entry into index for value {}", valueKey); + } + + /** + * Called when deleting entries on non-primary key columns + */ + private void delete(ByteBuffer rowKey, + Clustering clustering, + Cell cell, + OpOrder.Group opGroup, + int nowInSec) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + cell)); + doDelete(valueKey, + buildIndexClustering(rowKey, clustering, cell), + new DeletionTime(cell.timestamp(), nowInSec), + opGroup); + } + + /** + * Called when deleting entries from indexes on primary key columns + */ + private void delete(ByteBuffer rowKey, + Clustering clustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, + clustering, + null)); + doDelete(valueKey, + buildIndexClustering(rowKey, clustering, null), + deletion, + opGroup); + } + + private void doDelete(DecoratedKey indexKey, + Clustering indexClustering, + DeletionTime deletion, + OpOrder.Group opGroup) + { + Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion)); + PartitionUpdate upd = partitionUpdate(indexKey, row); + indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null); + logger.trace("Removed index entry for value {}", indexKey); + } + + private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException + { + assert indexedColumn.isPartitionKey(); + validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null)); + } + + private void validateClusterings(PartitionUpdate update) throws InvalidRequestException + { + assert indexedColumn.isClusteringColumn(); + for (Row row : update) + validateIndexedValue(getIndexedValue(null, row.clustering(), null)); + } + + private void validateRows(Iterable rows) + { + assert !indexedColumn.isPrimaryKeyColumn(); + for (Row row : rows) + { + if (indexedColumn.isComplex()) + { + ComplexColumnData data = row.getComplexColumnData(indexedColumn); + if (data != null) + { + for (Cell cell : data) + { + validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value())); + } + } + } + else + { + validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn))); + } + } + } + + private void validateIndexedValue(ByteBuffer value) + { + if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT) + throw new InvalidRequestException(String.format( + "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)", + value.remaining(), + metadata.name, + baseCfs.metadata.ksName, + baseCfs.metadata.cfName, + indexedColumn.name.toString(), + FBUtilities.MAX_UNSIGNED_SHORT)); + } + + private ByteBuffer getIndexedValue(ByteBuffer rowKey, + Clustering clustering, + Cell cell) + { + return getIndexedValue(rowKey, + clustering, + cell == null ? null : cell.path(), + cell == null ? null : cell.value() + ); + } + + private Clustering buildIndexClustering(ByteBuffer rowKey, + Clustering clustering, + Cell cell) + { + return buildIndexClusteringPrefix(rowKey, + clustering, + cell == null ? null : cell.path()).build(); + } + + private DecoratedKey getIndexKeyFor(ByteBuffer value) + { + return indexCfs.decorateKey(value); + } + + private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row) + { + return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row); + } + + private void invalidate() + { + // interrupt in-progress compactions + Collection cfss = Collections.singleton(indexCfs); + CompactionManager.instance.interruptCompactionForCFs(cfss, true); + CompactionManager.instance.waitForCessation(cfss); + Keyspace.writeOrder.awaitNewBarrier(); + indexCfs.forceBlockingFlush(); + indexCfs.readOrdering.awaitNewBarrier(); + indexCfs.invalidate(); + } + + private boolean isBuilt() + { + return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), metadata.name); + } + + private boolean isPrimaryKeyIndex() + { + return indexedColumn.isPrimaryKeyColumn(); + } + + private Callable getBuildIndexTask() + { + return () -> { + buildBlocking(); + return null; + }; + } + + private void buildBlocking() + { + baseCfs.forceBlockingFlush(); + + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)); + Refs sstables = viewFragment.refs) + { + if (sstables.isEmpty()) + { + logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built", + baseCfs.metadata.ksName, + baseCfs.metadata.cfName, + metadata.name); + baseCfs.indexManager.markIndexBuilt(metadata.name); + return; + } + + logger.info("Submitting index build of {} for data in {}", + metadata.name, + getSSTableNames(sstables)); + + SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, + Collections.singleton(this), + new ReducingKeyIterator(sstables)); + Future future = CompactionManager.instance.submitIndexBuild(builder); + FBUtilities.waitOnFuture(future); + indexCfs.forceBlockingFlush(); + baseCfs.indexManager.markIndexBuilt(metadata.name); + } + logger.info("Index build of {} complete", metadata.name); + } + + private static String getSSTableNames(Collection sstables) + { + return StreamSupport.stream(sstables.spliterator(), false) + .map(SSTableReader::toString) + .collect(Collectors.joining(", ")); + } + + /** + * Construct the CFMetadata for an index table, the clustering columns in the index table + * vary dependent on the kind of the indexed value. + * @param baseCfsMetadata + * @param indexMetadata + * @return + */ + public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata) + { + Pair target = parseTarget(baseCfsMetadata, indexMetadata); + CassandraIndexFunctions utils = getFunctions(indexMetadata, target); + ColumnDefinition indexedColumn = target.left; + AbstractType indexedValueType = utils.getIndexedValueType(indexedColumn); + + // Tables for legacy KEYS indexes are non-compound and dense + CFMetaData.Builder builder = indexMetadata.isKeys() + ? CFMetaData.Builder.create(baseCfsMetadata.ksName, + baseCfsMetadata.indexColumnFamilyName(indexMetadata), + true, false, false) + : CFMetaData.Builder.create(baseCfsMetadata.ksName, + baseCfsMetadata.indexColumnFamilyName(indexMetadata)); + + builder = builder.withId(baseCfsMetadata.cfId) + .withPartitioner(new LocalPartitioner(indexedValueType)) + .addPartitionKey(indexedColumn.name, indexedColumn.type) + .addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering()); + + if (indexMetadata.isKeys()) + { + // A dense, compact table for KEYS indexes must have a compact + // value column defined, even though it is never used + CompactTables.DefaultNames names = + CompactTables.defaultNameGenerator(ImmutableSet.of(indexedColumn.name.toString(), "partition_key")); + builder = builder.addRegularColumn(names.defaultCompactValueName(), EmptyType.instance); + } + else + { + // The clustering columns for a table backing a COMPOSITES index are dependent + // on the specific type of index (there are specializations for indexes on collections) + builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn); + } + + return builder.build().reloadIndexMetadataProperties(baseCfsMetadata); + } + + /** + * Factory method for new CassandraIndex instances + * @param baseCfs + * @param indexMetadata + * @return + */ + public static CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata) + { + return getFunctions(indexMetadata, parseTarget(baseCfs.metadata, indexMetadata)).newIndexInstance(baseCfs, indexMetadata); + } + + // Public because it's also used to convert index metadata into a thrift-compatible format + public static Pair parseTarget(CFMetaData cfm, + IndexMetadata indexDef) + { + String target = indexDef.options.get("target"); + assert target != null : String.format("No target definition found for index %s", indexDef.name); + + // if the regex matches then the target is in the form "keys(foo)", "entries(bar)" etc + // if not, then it must be a simple column name and implictly its type is VALUES + Matcher matcher = TARGET_REGEX.matcher(target); + String columnName; + IndexTarget.Type targetType; + if (matcher.matches()) + { + targetType = IndexTarget.Type.fromString(matcher.group(1)); + columnName = matcher.group(2); + } + else + { + columnName = target; + targetType = IndexTarget.Type.VALUES; + } + + // in the case of a quoted column name the name in the target string + // will be enclosed in quotes, which we need to unwrap. It may also + // include quote characters internally, escaped like so: + // abc"def -> abc""def. + // Because the target string is stored in a CQL compatible form, we + // need to un-escape any such quotes to get the actual column name + if (columnName.startsWith("\"")) + { + columnName = StringUtils.substring(StringUtils.substring(columnName, 1), 0, -1); + columnName = columnName.replaceAll("\"\"", "\""); + } + + // if it's not a CQL table, we can't assume that the column name is utf8, so + // in that case we have to do a linear scan of the cfm's columns to get the matching one + if (cfm.isCQLTable()) + return Pair.create(cfm.getColumnDefinition(new ColumnIdentifier(columnName, true)), targetType); + else + for (ColumnDefinition column : cfm.allColumns()) + if (column.name.toString().equals(columnName)) + return Pair.create(column, targetType); + + throw new RuntimeException(String.format("Unable to parse targets for index %s (%s)", indexDef.name, target)); + } + + static CassandraIndexFunctions getFunctions(IndexMetadata indexDef, + Pair target) + { + if (indexDef.isKeys()) + return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS; + + ColumnDefinition indexedColumn = target.left; + if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell()) + { + switch (((CollectionType)indexedColumn.type).kind) + { + case LIST: + return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS; + case SET: + return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS; + case MAP: + switch (target.right) + { + case KEYS: + return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS; + case KEYS_AND_VALUES: + return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS; + case VALUES: + return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS; + } + throw new AssertionError(); + } + } + + switch (indexedColumn.kind) + { + case CLUSTERING: + return CassandraIndexFunctions.CLUSTERING_COLUMN_INDEX_FUNCTIONS; + case REGULAR: + return CassandraIndexFunctions.REGULAR_COLUMN_INDEX_FUNCTIONS; + case PARTITION_KEY: + return CassandraIndexFunctions.PARTITION_KEY_INDEX_FUNCTIONS; + //case COMPACT_VALUE: + // return new CompositesIndexOnCompactValue(); + } + throw new AssertionError(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/833c993b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java index 5d43bd2,b653f4e..8376652 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java @@@ -1159,37 -845,90 +1159,60 @@@ public class SecondaryIndexTest extend row(bytes("foo124"), EMPTY_BYTE_BUFFER)); } + @Test - public void testIndexOnRegularColumnWithPartitionWithoutRows() throws Throwable - { - createTable("CREATE TABLE %s (pk int, c int, s int static, v int, PRIMARY KEY(pk, c))"); - createIndex("CREATE INDEX ON %s (v)"); - execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 1, 9, 1); - execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 2, 9, 2); - execute("INSERT INTO %s (pk, s) VALUES (?, ?)", 2, 9); - execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 3, 1, 9, 1); - flush(); - execute("DELETE FROM %s WHERE pk = ? and c = ?", 3, 1); - assertRows(execute("SELECT * FROM %s WHERE v = ?", 1), - row(1, 1, 9, 1)); - } - - /** - * Custom index used to test the behavior of the system when the index is not ready. - * As Custom indices cannot by PerColumnSecondaryIndex we use a PerRowSecondaryIndex - * to avoid the check but return a CompositesSearcher. - */ - public static class IndexBlockingOnInitialization extends PerRowSecondaryIndex ++ public void testIndexOnPartitionKeyWithStaticColumnAndNoRows() throws Throwable + { - private volatile CountDownLatch latch = new CountDownLatch(1); ++ createTable("CREATE TABLE %s (pk1 int, pk2 int, c int, s int static, v int, PRIMARY KEY((pk1, pk2), c))"); ++ createIndex("CREATE INDEX ON %s (pk2)"); ++ execute("INSERT INTO %s (pk1, pk2, c, s, v) VALUES (?, ?, ?, ?, ?)", 1, 1, 1, 9, 1); ++ execute("INSERT INTO %s (pk1, pk2, c, s, v) VALUES (?, ?, ?, ?, ?)", 1, 1, 2, 9, 2); ++ execute("INSERT INTO %s (pk1, pk2, s) VALUES (?, ?, ?)", 2, 1, 9); ++ execute("INSERT INTO %s (pk1, pk2, c, s, v) VALUES (?, ?, ?, ?, ?)", 3, 1, 1, 9, 1); + - @Override - public void index(ByteBuffer rowKey, ColumnFamily cf) - { - try - { - latch.await(); - } - catch (InterruptedException e) - { - Thread.interrupted(); - } - } ++ assertRows(execute("SELECT * FROM %s WHERE pk2 = ?", 1), ++ row(2, 1, null, 9, null), ++ row(1, 1, 1, 9, 1), ++ row(1, 1, 2, 9, 2), ++ row(3, 1, 1, 9, 1)); + - @Override - public void delete(DecoratedKey key, Group opGroup) - { - } ++ execute("UPDATE %s SET s=?, v=? WHERE pk1=? AND pk2=? AND c=?", 9, 1, 1, 10, 2); ++ assertRows(execute("SELECT * FROM %s WHERE pk2 = ?", 10), row(1, 10, 2, 9, 1)); + - @Override - public void init() - { - } ++ execute("UPDATE %s SET s=? WHERE pk1=? AND pk2=?", 9, 1, 20); ++ assertRows(execute("SELECT * FROM %s WHERE pk2 = ?", 20), row(1, 20, null, 9, null)); ++ } + - @Override - public void reload() - { - } + private ResultMessage.Prepared prepareStatement(String cql, boolean forThrift) + { + return QueryProcessor.prepare(String.format(cql, KEYSPACE, currentTable()), + ClientState.forInternalCalls(), + forThrift); + } - @Override - public void validateOptions() throws ConfigurationException - { - } + private void validateCell(Cell cell, ColumnDefinition def, ByteBuffer val, long timestamp) + { + assertNotNull(cell); + assertEquals(0, def.type.compare(cell.value(), val)); + assertEquals(timestamp, cell.timestamp()); + } - @Override - public String getIndexName() - { - return "testIndex"; - } + private static void assertColumnValue(int expected, String name, Row row, CFMetaData cfm) + { + ColumnDefinition col = cfm.getColumnDefinition(new ColumnIdentifier(name, true)); + AbstractType type = col.type; + assertEquals(expected, type.compose(row.getCell(col).value())); + } - @Override - protected SecondaryIndexSearcher createSecondaryIndexSearcher(Set columns) - { - return new CompositesSearcher(baseCfs.indexManager, columns) - { - @Override - public boolean canHandleIndexClause(List clause) - { - return true; - } - - @Override - public void validate(IndexExpression indexExpression) throws InvalidRequestException - { - } - }; - } + /** + * CassandraIndex that blocks during the initialization. + */ + public static class IndexBlockingOnInitialization extends CustomCassandraIndex + { + private final CountDownLatch latch = new CountDownLatch(1); - @Override - public void forceBlockingFlush() + public IndexBlockingOnInitialization(ColumnFamilyStore baseCfs, IndexMetadata indexDef) { + super(baseCfs, indexDef); } @Override