cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [1/3] cassandra git commit: Don't deadlock when flushing CFS backed, custom indexes
Date Wed, 26 Aug 2015 08:34:38 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 b8172dcaa -> 715c1513c
  refs/heads/trunk e37efea67 -> 32817b21a


Don't deadlock when flushing CFS backed, custom indexes

Patch by Sam Tunnicliffe; reviewed by Tyler Hobbs for CASSANDRA-10181


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/715c1513
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/715c1513
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/715c1513

Branch: refs/heads/cassandra-3.0
Commit: 715c1513c322ea627163027c65785ea99f7d526d
Parents: b8172dc
Author: Sam Tunnicliffe <sam@beobal.com>
Authored: Tue Aug 25 22:09:20 2015 +0100
Committer: Sam Tunnicliffe <sam@beobal.com>
Committed: Wed Aug 26 08:47:24 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   2 +-
 .../cassandra/index/SecondaryIndexManager.java  |   8 +-
 .../index/internal/CustomCassandraIndex.java    | 738 +++++++++++++++++++
 .../index/internal/CustomIndexTest.java         |  20 +
 5 files changed, 764 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/715c1513/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a3dc02..2643bfc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta2
+ * Don't deadlock when flushing CFS backed custom indexes (CASSANDRA-10181)
  * Fix double flushing of secondary index tables (CASSANDRA-10180)
  * Fix incorrect handling of range tombstones in thrift (CASSANDRA-10046)
  * Only use batchlog when paired materialized view replica is remote (CASSANDRA-10061)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/715c1513/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b177c23..efac287 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -953,7 +953,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
              */
 
             if (flushSecondaryIndexes)
-                indexManager.flushAllCustomIndexesBlocking();
+                indexManager.flushAllNonCFSBackedIndexesBlocking();
 
             try
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/715c1513/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 6bff916..233a0f2 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -94,7 +94,6 @@ public class SecondaryIndexManager implements IndexRegistry
 {
     private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
 
-
     private Map<String, Index> indexes = Maps.newConcurrentMap();
 
     // executes tasks returned by Indexer#addIndexColumn which may require index(es) to be
(re)built
@@ -343,6 +342,7 @@ public class SecondaryIndexManager implements IndexRegistry
                      .map(cfs -> wait.add(cfs.forceFlush()))
                      .orElseGet(() -> nonCfsIndexes.add(index)));
         }
+
         executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask);
         FBUtilities.waitOnFutures(wait);
     }
@@ -350,11 +350,11 @@ public class SecondaryIndexManager implements IndexRegistry
     /**
      * Performs a blocking flush of all custom indexes
      */
-    public void flushAllCustomIndexesBlocking()
+    public void flushAllNonCFSBackedIndexesBlocking()
     {
         Set<Index> customIndexers = indexes.values().stream()
-                                             .filter(index -> !(index instanceof CassandraIndex))
-                                             .collect(Collectors.toSet());
+                                                    .filter(index -> !(index.getBackingTable().isPresent()))
+                                                    .collect(Collectors.toSet());
         flushIndexesBlocking(customIndexers);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/715c1513/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
new file mode 100644
index 0000000..3326b3f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@ -0,0 +1,738 @@
+package org.apache.cassandra.index.internal;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.IndexRegistry;
+import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.index.internal.composites.CompositesSearcher;
+import org.apache.cassandra.index.internal.keys.KeysSearcher;
+import org.apache.cassandra.index.transactions.IndexTransaction;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+/**
+ * Clone of KeysIndex used in CassandraIndexTest#testCustomIndexWithCFS to verify
+ * behaviour of flushing CFS backed CUSTOM indexes
+ */
+public class CustomCassandraIndex implements Index
+{
+    private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class);
+
+    public final ColumnFamilyStore baseCfs;
+    protected IndexMetadata metadata;
+    protected ColumnFamilyStore indexCfs;
+    protected ColumnDefinition indexedColumn;
+    protected CassandraIndexFunctions functions;
+
+    public CustomCassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+    {
+        this.baseCfs = baseCfs;
+        setMetadata(indexDef);
+    }
+
+    /**
+     * Returns true if an index of this type can support search predicates of the form [column]
OPERATOR [value]
+     * @param indexedColumn
+     * @param operator
+     * @return
+     */
+    protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
+    {
+        return operator.equals(Operator.EQ);
+    }
+
+    public ColumnDefinition getIndexedColumn()
+    {
+        return indexedColumn;
+    }
+
+    public ClusteringComparator getIndexComparator()
+    {
+        return indexCfs.metadata.comparator;
+    }
+
+    public ColumnFamilyStore getIndexCfs()
+    {
+        return indexCfs;
+    }
+
+    public void register(IndexRegistry registry)
+    {
+        registry.registerIndex(this);
+    }
+
+    public Callable<?> getInitializationTask()
+    {
+        // if we're just linking in the index on an already-built index post-restart
+        // we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder
+        return isBuilt() ? null : getBuildIndexTask();
+    }
+
+    public IndexMetadata getIndexMetadata()
+    {
+        return metadata;
+    }
+
+    public String getIndexName()
+    {
+        // should return metadata.name, see CASSANDRA-10127
+        return indexCfs.name;
+    }
+
+    public Optional<ColumnFamilyStore> getBackingTable()
+    {
+        return indexCfs == null ? Optional.empty() : Optional.of(indexCfs);
+    }
+
+    public Callable<Void> getBlockingFlushTask()
+    {
+        return () -> {
+            indexCfs.forceBlockingFlush();
+            return null;
+        };
+    }
+
+    public Callable<?> getInvalidateTask()
+    {
+        return () -> {
+            markRemoved();
+            invalidate();
+            return null;
+        };
+    }
+
+    public Callable<?> getMetadataReloadTask(IndexMetadata indexDef)
+    {
+        setMetadata(indexDef);
+        return () -> {
+            indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
+            indexCfs.reload();
+            return null;
+        };
+    }
+
+    private void setMetadata(IndexMetadata indexDef)
+    {
+        metadata = indexDef;
+        functions = getFunctions(baseCfs.metadata, indexDef);
+        CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
+        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
+                                                             cfm.cfName,
+                                                             cfm,
+                                                             baseCfs.getTracker().loadsstables);
+        assert indexDef.columns.size() == 1 : "Build in indexes on multiple target columns
are not supported";
+        indexedColumn = indexDef.indexedColumn(baseCfs.metadata);
+    }
+
+    public Callable<?> getTruncateTask(final long truncatedAt)
+    {
+        return () -> {
+            indexCfs.discardSSTables(truncatedAt);
+            return null;
+        };
+    }
+
+    public boolean indexes(PartitionColumns columns)
+    {
+        // if we have indexes on the partition key or clustering columns, return true
+        return isPrimaryKeyIndex() || columns.contains(indexedColumn);
+    }
+
+    public boolean supportsExpression(ColumnDefinition column, Operator operator)
+    {
+        return indexedColumn.name.equals(column.name)
+               && supportsOperator(indexedColumn, operator);
+    }
+
+    private boolean supportsExpression(RowFilter.Expression expression)
+    {
+        return supportsExpression(expression.column(), expression.operator());
+    }
+
+    public long getEstimatedResultRows()
+    {
+        return indexCfs.getMeanColumns();
+    }
+
+    /**
+     * No post processing of query results, just return them unchanged
+     */
+    public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand
command)
+    {
+        return (partitionIterator, readCommand) -> partitionIterator;
+    }
+
+    public RowFilter getPostIndexQueryFilter(RowFilter filter)
+    {
+        return getTargetExpression(filter.getExpressions()).map(filter::without)
+                                                           .orElse(filter);
+    }
+
+    private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression>
expressions)
+    {
+        return expressions.stream().filter(this::supportsExpression).findFirst();
+    }
+
+    public Index.Searcher searcherFor(ReadCommand command)
+    {
+        return null;
+        /*
+        Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions());
+
+        if (target.isPresent())
+        {
+            target.get().validateForIndexing();
+            switch (getIndexMetadata().indexType)
+            {
+                case COMPOSITES:
+                    return new CompositesSearcher(command, target.get(), this);
+                case KEYS:
+                    return new KeysSearcher(command, target.get(), this);
+                default:
+                    throw new IllegalStateException(String.format("Unsupported index type
%s for index %s on %s",
+                                                                  metadata.indexType,
+                                                                  metadata.name,
+                                                                  indexedColumn.name.toString()));
+            }
+        }
+
+        return null;
+
+        */
+    }
+
+    public void validate(PartitionUpdate update) throws InvalidRequestException
+    {
+        switch (indexedColumn.kind)
+        {
+            case PARTITION_KEY:
+                validatePartitionKey(update.partitionKey());
+                break;
+            case CLUSTERING:
+                validateClusterings(update);
+                break;
+            case REGULAR:
+                validateRows(update);
+                break;
+            case STATIC:
+                validateRows(Collections.singleton(update.staticRow()));
+                break;
+        }
+    }
+
+    protected CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+                                               ClusteringPrefix prefix,
+                                               CellPath path)
+    {
+        CBuilder builder = CBuilder.create(getIndexComparator());
+        builder.add(partitionKey);
+        return builder;
+    }
+
+    protected ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+                                      Clustering clustering,
+                                      CellPath path, ByteBuffer cellValue)
+    {
+        return cellValue;
+    }
+
+    public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
+    {
+        throw new UnsupportedOperationException("KEYS indexes do not use a specialized index
entry format");
+    }
+
+    public boolean isStale(Row row, ByteBuffer indexValue, int nowInSec)
+    {
+        if (row == null)
+            return true;
+
+        Cell cell = row.getCell(indexedColumn);
+
+        return (cell == null
+             || !cell.isLive(nowInSec)
+             || indexedColumn.type.compare(indexValue, cell.value()) != 0);
+    }
+
+    public Indexer indexerFor(final DecoratedKey key,
+                              final int nowInSec,
+                              final OpOrder.Group opGroup,
+                              final IndexTransaction.Type transactionType)
+    {
+        return new Indexer()
+        {
+            public void begin()
+            {
+            }
+
+            public void partitionDelete(DeletionTime deletionTime)
+            {
+            }
+
+            public void rangeTombstone(RangeTombstone tombstone)
+            {
+            }
+
+            public void insertRow(Row row)
+            {
+                if (isPrimaryKeyIndex())
+                {
+                    indexPrimaryKey(row.clustering(),
+                                    getPrimaryKeyIndexLiveness(row),
+                                    row.deletion());
+                }
+                else
+                {
+                    if (indexedColumn.isComplex())
+                        indexCells(row.clustering(), row.getComplexColumnData(indexedColumn));
+                    else
+                        indexCell(row.clustering(), row.getCell(indexedColumn));
+                }
+            }
+
+            public void removeRow(Row row)
+            {
+                if (isPrimaryKeyIndex())
+                    indexPrimaryKey(row.clustering(), row.primaryKeyLivenessInfo(), row.deletion());
+
+                if (indexedColumn.isComplex())
+                    removeCells(row.clustering(), row.getComplexColumnData(indexedColumn));
+                else
+                    removeCell(row.clustering(), row.getCell(indexedColumn));
+            }
+
+
+            public void updateRow(Row oldRow, Row newRow)
+            {
+                if (isPrimaryKeyIndex())
+                    indexPrimaryKey(newRow.clustering(),
+                                    newRow.primaryKeyLivenessInfo(),
+                                    newRow.deletion());
+
+                if (indexedColumn.isComplex())
+                {
+                    indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn));
+                    removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn));
+                }
+                else
+                {
+                    indexCell(newRow.clustering(), newRow.getCell(indexedColumn));
+                    removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn));
+                }
+            }
+
+            public void finish()
+            {
+            }
+
+            private void indexCells(Clustering clustering, Iterable<Cell> cells)
+            {
+                if (cells == null)
+                    return;
+
+                for (Cell cell : cells)
+                    indexCell(clustering, cell);
+            }
+
+            private void indexCell(Clustering clustering, Cell cell)
+            {
+                if (cell == null || !cell.isLive(nowInSec))
+                    return;
+
+                insert(key.getKey(),
+                       clustering,
+                       cell,
+                       LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()),
+                       opGroup);
+            }
+
+            private void removeCells(Clustering clustering, Iterable<Cell> cells)
+            {
+                if (cells == null)
+                    return;
+
+                for (Cell cell : cells)
+                    removeCell(clustering, cell);
+            }
+
+            private void removeCell(Clustering clustering, Cell cell)
+            {
+                if (cell == null || !cell.isLive(nowInSec))
+                    return;
+
+                delete(key.getKey(), clustering, cell, opGroup, nowInSec);
+            }
+
+            private void indexPrimaryKey(final Clustering clustering,
+                                         final LivenessInfo liveness,
+                                         final DeletionTime deletion)
+            {
+                if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
+                    insert(key.getKey(), clustering, null, liveness, opGroup);
+
+                if (!deletion.isLive())
+                    delete(key.getKey(), clustering, deletion, opGroup);
+            }
+
+            private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
+            {
+                long timestamp = row.primaryKeyLivenessInfo().timestamp();
+                int ttl = row.primaryKeyLivenessInfo().ttl();
+                for (Cell cell : row.cells())
+                {
+                    long cellTimestamp = cell.timestamp();
+                    if (cell.isLive(nowInSec))
+                    {
+                        if (cellTimestamp > timestamp)
+                        {
+                            timestamp = cellTimestamp;
+                            ttl = cell.ttl();
+                        }
+                    }
+                }
+                return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec);
+            }
+        };
+    }
+
+    /**
+     * Specific to internal indexes, this is called by a
+     * searcher when it encounters a stale entry in the index
+     * @param indexKey the partition key in the index table
+     * @param indexClustering the clustering in the index table
+     * @param deletion deletion timestamp etc
+     * @param opGroup the operation under which to perform the deletion
+     */
+    public void deleteStaleEntry(DecoratedKey indexKey,
+                                 Clustering indexClustering,
+                                 DeletionTime deletion,
+                                 OpOrder.Group opGroup)
+    {
+        doDelete(indexKey, indexClustering, deletion, opGroup);
+        logger.debug("Removed index entry for stale value {}", indexKey);
+    }
+
+    /**
+     * Called when adding a new entry to the index
+     */
+    private void insert(ByteBuffer rowKey,
+                        Clustering clustering,
+                        Cell cell,
+                        LivenessInfo info,
+                        OpOrder.Group opGroup)
+    {
+        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+                                                               clustering,
+                                                               cell));
+        Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell),
info);
+        PartitionUpdate upd = partitionUpdate(valueKey, row);
+        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
+        logger.debug("Inserted entry into index for value {}", valueKey);
+    }
+
+    /**
+     * Called when deleting entries on non-primary key columns
+     */
+    private void delete(ByteBuffer rowKey,
+                        Clustering clustering,
+                        Cell cell,
+                        OpOrder.Group opGroup,
+                        int nowInSec)
+    {
+        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+                                                               clustering,
+                                                               cell));
+        doDelete(valueKey,
+                 buildIndexClustering(rowKey, clustering, cell),
+                 new DeletionTime(cell.timestamp(), nowInSec),
+                 opGroup);
+    }
+
+    /**
+     * Called when deleting entries from indexes on primary key columns
+     */
+    private void delete(ByteBuffer rowKey,
+                        Clustering clustering,
+                        DeletionTime deletion,
+                        OpOrder.Group opGroup)
+    {
+        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+                                                               clustering,
+                                                               null));
+        doDelete(valueKey,
+                 buildIndexClustering(rowKey, clustering, null),
+                 deletion,
+                 opGroup);
+    }
+
+    private void doDelete(DecoratedKey indexKey,
+                          Clustering indexClustering,
+                          DeletionTime deletion,
+                          OpOrder.Group opGroup)
+    {
+        Row row = BTreeRow.emptyDeletedRow(indexClustering, deletion);
+        PartitionUpdate upd = partitionUpdate(indexKey, row);
+        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
+        logger.debug("Removed index entry for value {}", indexKey);
+    }
+
+    private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException
+    {
+        assert indexedColumn.isPartitionKey();
+        validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null ));
+    }
+
+    private void validateClusterings(PartitionUpdate update) throws InvalidRequestException
+    {
+        assert indexedColumn.isClusteringColumn();
+        for (Row row : update)
+            validateIndexedValue(getIndexedValue(null, row.clustering(), null));
+    }
+
+    private void validateRows(Iterable<Row> rows)
+    {
+        assert !indexedColumn.isPrimaryKeyColumn();
+        for (Row row : rows)
+        {
+            if (indexedColumn.isComplex())
+            {
+                ComplexColumnData data = row.getComplexColumnData(indexedColumn);
+                if (data != null)
+                {
+                    for (Cell cell : data)
+                    {
+                        validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value()));
+                    }
+                }
+            }
+            else
+            {
+                validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn)));
+            }
+        }
+    }
+
+    private void validateIndexedValue(ByteBuffer value)
+    {
+        if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
+            throw new InvalidRequestException(String.format(
+                                                           "Cannot index value of size %d
for index %s on %s.%s(%s) (maximum allowed size=%d)",
+                                                           value.remaining(),
+                                                           getIndexName(),
+                                                           baseCfs.metadata.ksName,
+                                                           baseCfs.metadata.cfName,
+                                                           indexedColumn.name.toString(),
+                                                           FBUtilities.MAX_UNSIGNED_SHORT));
+    }
+
+    private ByteBuffer getIndexedValue(ByteBuffer rowKey,
+                                       Clustering clustering,
+                                       Cell cell)
+    {
+        return getIndexedValue(rowKey,
+                               clustering,
+                               cell == null ? null : cell.path(),
+                               cell == null ? null : cell.value()
+        );
+    }
+
+    private Clustering buildIndexClustering(ByteBuffer rowKey,
+                                            Clustering clustering,
+                                            Cell cell)
+    {
+        return buildIndexClusteringPrefix(rowKey,
+                                          clustering,
+                                          cell == null ? null : cell.path()).build();
+    }
+
+    private DecoratedKey getIndexKeyFor(ByteBuffer value)
+    {
+        return indexCfs.decorateKey(value);
+    }
+
+    private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row)
+    {
+        return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
+    }
+
+    private void invalidate()
+    {
+        // interrupt in-progress compactions
+        Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
+        CompactionManager.instance.interruptCompactionForCFs(cfss, true);
+        CompactionManager.instance.waitForCessation(cfss);
+        indexCfs.keyspace.writeOrder.awaitNewBarrier();
+        indexCfs.forceBlockingFlush();
+        indexCfs.readOrdering.awaitNewBarrier();
+        indexCfs.invalidate();
+    }
+
+    private boolean isBuilt()
+    {
+        return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getIndexName());
+    }
+
+    private void markBuilt()
+    {
+        SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), getIndexName());
+    }
+
+    private void markRemoved()
+    {
+        SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), getIndexName());
+    }
+
+    private boolean isPrimaryKeyIndex()
+    {
+        return indexedColumn.isPrimaryKeyColumn();
+    }
+
+    private Callable<?> getBuildIndexTask()
+    {
+        return () -> {
+            buildBlocking();
+            return null;
+        };
+    }
+
+    private void buildBlocking()
+    {
+        baseCfs.forceBlockingFlush();
+
+        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
+             Refs<SSTableReader> sstables = viewFragment.refs)
+        {
+            if (sstables.isEmpty())
+            {
+                logger.info("No SSTable data for {}.{} to build index {} from, marking empty
index as built",
+                            baseCfs.metadata.ksName,
+                            baseCfs.metadata.cfName,
+                            getIndexName());
+                markBuilt();
+                return;
+            }
+
+            logger.info("Submitting index build of {} for data in {}",
+                        getIndexName(),
+                        getSSTableNames(sstables));
+
+            SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
+                                                                      Collections.singleton(this),
+                                                                      new ReducingKeyIterator(sstables));
+            Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
+            FBUtilities.waitOnFuture(future);
+            indexCfs.forceBlockingFlush();
+            markBuilt();
+        }
+        logger.info("Index build of {} complete", getIndexName());
+    }
+
+    private static String getSSTableNames(Collection<SSTableReader> sstables)
+    {
+        return StreamSupport.stream(sstables.spliterator(), false)
+                            .map(SSTableReader::toString)
+                            .collect(Collectors.joining(", "));
+    }
+
+    /**
+     * Construct the CFMetadata for an index table, the clustering columns in the index table
+     * vary dependent on the kind of the indexed value.
+     * @param baseCfsMetadata
+     * @param indexMetadata
+     * @return
+     */
+    public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata
indexMetadata)
+    {
+        CassandraIndexFunctions utils = getFunctions(baseCfsMetadata, indexMetadata);
+        ColumnDefinition indexedColumn = indexMetadata.indexedColumn(baseCfsMetadata);
+        AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn);
+        CFMetaData.Builder builder = CFMetaData.Builder.create(baseCfsMetadata.ksName,
+                                                               baseCfsMetadata.indexColumnFamilyName(indexMetadata))
+                                                       .withId(baseCfsMetadata.cfId)
+                                                       .withPartitioner(new LocalPartitioner(indexedValueType))
+                                                       .addPartitionKey(indexedColumn.name,
indexedColumn.type);
+
+        builder.addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering());
+        builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn);
+        return builder.build().reloadIndexMetadataProperties(baseCfsMetadata);
+    }
+
+    /**
+     * Factory method for new CassandraIndex instances
+     * @param baseCfs
+     * @param indexMetadata
+     * @return
+     */
+    public static final CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata
indexMetadata)
+    {
+        return getFunctions(baseCfs.metadata, indexMetadata).newIndexInstance(baseCfs, indexMetadata);
+    }
+
+    private static CassandraIndexFunctions getFunctions(CFMetaData baseCfMetadata, IndexMetadata
indexDef)
+    {
+        if (indexDef.isKeys())
+            return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS;
+
+        ColumnDefinition indexedColumn = indexDef.indexedColumn(baseCfMetadata);
+        if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell())
+        {
+            switch (((CollectionType)indexedColumn.type).kind)
+            {
+                case LIST:
+                    return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
+                case SET:
+                    return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
+                case MAP:
+                    if (indexDef.options.containsKey(IndexTarget.INDEX_KEYS_OPTION_NAME))
+                        return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
+                    else if (indexDef.options.containsKey(IndexTarget.INDEX_ENTRIES_OPTION_NAME))
+                        return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
+                    else
+                        return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
+            }
+        }
+
+        switch (indexedColumn.kind)
+        {
+            case CLUSTERING:
+                return CassandraIndexFunctions.CLUSTERING_COLUMN_INDEX_FUNCTIONS;
+            case REGULAR:
+                return CassandraIndexFunctions.REGULAR_COLUMN_INDEX_FUNCTIONS;
+            case PARTITION_KEY:
+                return CassandraIndexFunctions.PARTITION_KEY_INDEX_FUNCTIONS;
+            //case COMPACT_VALUE:
+            //    return new CompositesIndexOnCompactValue();
+        }
+        throw new AssertionError();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/715c1513/test/unit/org/apache/cassandra/index/internal/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/internal/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/internal/CustomIndexTest.java
new file mode 100644
index 0000000..3cc7987
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/internal/CustomIndexTest.java
@@ -0,0 +1,20 @@
+package org.apache.cassandra.index.internal;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.junit.Test;
+
+public class CustomIndexTest extends CQLTester
+{
+    @Test
+    public void testInserts() throws Throwable
+    {
+        // test to ensure that we don't deadlock when flushing CFS backed custom indexers
+        // see CASSANDRA-10181
+        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
+        createIndex("CREATE CUSTOM INDEX myindex ON %s(c) USING 'org.apache.cassandra.index.internal.CustomCassandraIndex'");
+
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 2);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 0, 1);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 2, 0, 0);
+    }
+}


Mime
View raw message