cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ble...@apache.org
Subject [2/4] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0
Date Fri, 27 Nov 2015 10:16:14 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index d30937f,b8f6b9f..e8bf1fd
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@@ -17,38 -17,35 +17,44 @@@
   */
  package org.apache.cassandra.cql3.validation.entities;
  
- import java.nio.ByteBuffer;
- import java.util.*;
- 
- import org.apache.cassandra.db.DeletionTime;
- import org.apache.cassandra.utils.Pair;
- import org.apache.commons.lang3.StringUtils;
 +import org.junit.Before;
 +import org.junit.Test;
++import static org.apache.cassandra.Util.throwAssert;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertFalse;
++import static org.junit.Assert.assertNotNull;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.HashMap;
 -import java.util.List;
+ import java.util.Locale;
+ import java.util.Map;
 -import java.util.Set;
+ import java.util.UUID;
++import java.util.concurrent.Callable;
+ import java.util.concurrent.CountDownLatch;
  
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.cql3.CQLTester;
 -import org.apache.cassandra.db.ColumnFamily;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
  import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.DecoratedKey;
 -import org.apache.cassandra.db.IndexExpression;
 -import org.apache.cassandra.db.composites.CellName;
 -import org.apache.cassandra.db.index.IndexNotAvailableException;
 -import org.apache.cassandra.db.index.PerRowSecondaryIndex;
 -import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 -import org.apache.cassandra.db.index.composites.CompositesSearcher;
++import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.rows.Cell;
 +import org.apache.cassandra.db.rows.Row;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.exceptions.InvalidRequestException;
  import org.apache.cassandra.exceptions.SyntaxException;
 -import org.apache.cassandra.utils.FBUtilities;
 -import org.apache.cassandra.utils.concurrent.OpOrder.Group;
++import org.apache.cassandra.index.IndexNotAvailableException;
 +import org.apache.cassandra.index.SecondaryIndexManager;
 +import org.apache.cassandra.index.StubIndex;
++import org.apache.cassandra.index.internal.CustomCassandraIndex;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.utils.ByteBufferUtil;
- 
- import static org.apache.cassandra.Util.throwAssert;
- import static org.junit.Assert.assertEquals;
- import static org.junit.Assert.assertFalse;
- import static org.junit.Assert.assertNotNull;
- import static org.junit.Assert.assertTrue;
- import static org.junit.Assert.fail;
++import org.apache.cassandra.utils.Pair;
+ import org.apache.commons.lang3.StringUtils;
 -import org.junit.Test;
  
  public class SecondaryIndexTest extends CQLTester
  {
@@@ -668,16 -575,9 +674,16 @@@
      {
          createTable("CREATE TABLE %s(a int, b frozen<map<int, blob>>, PRIMARY
KEY (a))");
          createIndex("CREATE INDEX ON %s(full(b))");
-         Map<Integer, ByteBuffer> map = new HashMap();
+         Map<Integer, ByteBuffer> map = new HashMap<>();
          map.put(0, ByteBuffer.allocate(1024 * 65));
          failInsert("INSERT INTO %s (a, b) VALUES (0, ?)", map);
 +        failInsert("INSERT INTO %s (a, b) VALUES (0, ?) IF NOT EXISTS", map);
 +        failInsert("BEGIN BATCH\n" +
 +                   "INSERT INTO %s (a, b) VALUES (0, ?);\n" +
 +                   "APPLY BATCH", map);
 +        failInsert("BEGIN BATCH\n" +
 +                   "INSERT INTO %s (a, b) VALUES (0, ?) IF NOT EXISTS;\n" +
 +                   "APPLY BATCH", map);
      }
  
      public void failInsert(String insertCQL, Object...args) throws Throwable
@@@ -755,180 -655,134 +761,234 @@@
      }
  
      @Test
 +    public void testMultipleIndexesOnOneColumn() throws Throwable
 +    {
 +        String indexClassName = StubIndex.class.getName();
 +        createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY ((a), b))");
 +        // uses different options otherwise the two indexes are considered duplicates
 +        createIndex(String.format("CREATE CUSTOM INDEX c_idx_1 ON %%s(c) USING '%s' WITH
OPTIONS = {'foo':'a'}", indexClassName));
 +        createIndex(String.format("CREATE CUSTOM INDEX c_idx_2 ON %%s(c) USING '%s' WITH
OPTIONS = {'foo':'b'}", indexClassName));
 +
 +        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
 +        CFMetaData cfm = cfs.metadata;
 +        StubIndex index1 = (StubIndex)cfs.indexManager.getIndex(cfm.getIndexes()
 +                                                                   .get("c_idx_1")
 +                                                                   .orElseThrow(throwAssert("index
not found")));
 +        StubIndex index2 = (StubIndex)cfs.indexManager.getIndex(cfm.getIndexes()
 +                                                                   .get("c_idx_2")
 +                                                                   .orElseThrow(throwAssert("index
not found")));
 +        Object[] row1a = row(0, 0, 0);
 +        Object[] row1b = row(0, 0, 1);
 +        Object[] row2 = row(2, 2, 2);
 +        execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row1a);
 +        execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row1b);
 +        execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row2);
 +
 +        assertEquals(2, index1.rowsInserted.size());
 +        assertColumnValue(0, "c", index1.rowsInserted.get(0), cfm);
 +        assertColumnValue(2, "c", index1.rowsInserted.get(1), cfm);
 +
 +        assertEquals(2, index2.rowsInserted.size());
 +        assertColumnValue(0, "c", index2.rowsInserted.get(0), cfm);
 +        assertColumnValue(2, "c", index2.rowsInserted.get(1), cfm);
 +
 +        assertEquals(1, index1.rowsUpdated.size());
 +        assertColumnValue(0, "c", index1.rowsUpdated.get(0).left, cfm);
 +        assertColumnValue(1, "c", index1.rowsUpdated.get(0).right, cfm);
 +
 +        assertEquals(1, index2.rowsUpdated.size());
 +        assertColumnValue(0, "c", index2.rowsUpdated.get(0).left, cfm);
 +        assertColumnValue(1, "c", index2.rowsUpdated.get(0).right, cfm);
 +    }
 +
 +    @Test
 +    public void testDeletions() throws Throwable
 +    {
 +        // Test for bugs like CASSANDRA-10694.  These may not be readily visible with the
built-in secondary index
 +        // implementation because of the stale entry handling.
 +
 +        String indexClassName = StubIndex.class.getName();
 +        createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY ((a), b))");
 +        createIndex(String.format("CREATE CUSTOM INDEX c_idx ON %%s(c) USING '%s'", indexClassName));
 +
 +        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
 +        CFMetaData cfm = cfs.metadata;
 +        StubIndex index1 = (StubIndex) cfs.indexManager.getIndex(cfm.getIndexes()
 +                .get("c_idx")
 +                .orElseThrow(throwAssert("index not found")));
 +
 +        execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?) USING TIMESTAMP 1", 0, 0, 0);
 +        assertEquals(1, index1.rowsInserted.size());
 +
 +        execute("DELETE FROM %s USING TIMESTAMP 2 WHERE a = ? AND b = ?", 0, 0);
 +        assertEquals(1, index1.rowsUpdated.size());
 +        Pair<Row, Row> update = index1.rowsUpdated.get(0);
 +        Row existingRow = update.left;
 +        Row newRow = update.right;
 +
 +        // check the existing row from the update call
 +        assertTrue(existingRow.deletion().isLive());
 +        assertEquals(DeletionTime.LIVE, existingRow.deletion().time());
 +        assertEquals(1L, existingRow.primaryKeyLivenessInfo().timestamp());
 +
 +        // check the new row from the update call
 +        assertFalse(newRow.deletion().isLive());
 +        assertEquals(2L, newRow.deletion().time().markedForDeleteAt());
 +        assertFalse(newRow.cells().iterator().hasNext());
 +
 +        // delete the same row again
 +        execute("DELETE FROM %s USING TIMESTAMP 3 WHERE a = ? AND b = ?", 0, 0);
 +        assertEquals(2, index1.rowsUpdated.size());
 +        update = index1.rowsUpdated.get(1);
 +        existingRow = update.left;
 +        newRow = update.right;
 +
 +        // check the new row from the update call
 +        assertFalse(existingRow.deletion().isLive());
 +        assertEquals(2L, existingRow.deletion().time().markedForDeleteAt());
 +        assertFalse(existingRow.cells().iterator().hasNext());
 +
 +        // check the new row from the update call
 +        assertFalse(newRow.deletion().isLive());
 +        assertEquals(3L, newRow.deletion().time().markedForDeleteAt());
 +        assertFalse(newRow.cells().iterator().hasNext());
 +    }
 +
 +    @Test
 +    public void testUpdatesToMemtableData() throws Throwable
 +    {
 +        // verify the contract specified by Index.Indexer::updateRow(oldRowData, newRowData),
 +        // when a row in the memtable is updated, the indexer should be informed of:
 +        // * new columns
 +        // * removed columns
 +        // * columns whose value, timestamp or ttl have been modified.
 +        // Any columns which are unchanged by the update are not passed to the Indexer
 +        // Note that for simplicity this test resets the index between each scenario
 +        createTable("CREATE TABLE %s (k int, c int, v1 int, v2 int, PRIMARY KEY (k,c))");
 +        createIndex(String.format("CREATE CUSTOM INDEX test_index ON %%s() USING '%s'",
StubIndex.class.getName()));
 +        execute("INSERT INTO %s (k, c, v1, v2) VALUES (0, 0, 0, 0) USING TIMESTAMP 0");
 +
 +        ColumnDefinition v1 = getCurrentColumnFamilyStore().metadata.getColumnDefinition(new
ColumnIdentifier("v1", true));
 +        ColumnDefinition v2 = getCurrentColumnFamilyStore().metadata.getColumnDefinition(new
ColumnIdentifier("v2", true));
 +
 +        StubIndex index = (StubIndex)getCurrentColumnFamilyStore().indexManager.getIndexByName("test_index");
 +        assertEquals(1, index.rowsInserted.size());
 +
 +        // Overwrite a single value, leaving the other untouched
 +        execute("UPDATE %s USING TIMESTAMP 1 SET v1=1 WHERE k=0 AND c=0");
 +        assertEquals(1, index.rowsUpdated.size());
 +        Row oldRow = index.rowsUpdated.get(0).left;
 +        assertEquals(1, oldRow.size());
 +        validateCell(oldRow.getCell(v1), v1, ByteBufferUtil.bytes(0), 0);
 +        Row newRow = index.rowsUpdated.get(0).right;
 +        assertEquals(1, newRow.size());
 +        validateCell(newRow.getCell(v1), v1, ByteBufferUtil.bytes(1), 1);
 +        index.reset();
 +
 +        // Overwrite both values
 +        execute("UPDATE %s USING TIMESTAMP 2 SET v1=2, v2=2 WHERE k=0 AND c=0");
 +        assertEquals(1, index.rowsUpdated.size());
 +        oldRow = index.rowsUpdated.get(0).left;
 +        assertEquals(2, oldRow.size());
 +        validateCell(oldRow.getCell(v1), v1, ByteBufferUtil.bytes(1), 1);
 +        validateCell(oldRow.getCell(v2), v2, ByteBufferUtil.bytes(0), 0);
 +        newRow = index.rowsUpdated.get(0).right;
 +        assertEquals(2, newRow.size());
 +        validateCell(newRow.getCell(v1), v1, ByteBufferUtil.bytes(2), 2);
 +        validateCell(newRow.getCell(v2), v2, ByteBufferUtil.bytes(2), 2);
 +        index.reset();
 +
 +        // Delete one value
 +        execute("DELETE v1 FROM %s USING TIMESTAMP 3 WHERE k=0 AND c=0");
 +        assertEquals(1, index.rowsUpdated.size());
 +        oldRow = index.rowsUpdated.get(0).left;
 +        assertEquals(1, oldRow.size());
 +        validateCell(oldRow.getCell(v1), v1, ByteBufferUtil.bytes(2), 2);
 +        newRow = index.rowsUpdated.get(0).right;
 +        assertEquals(1, newRow.size());
 +        Cell newCell = newRow.getCell(v1);
 +        assertTrue(newCell.isTombstone());
 +        assertEquals(3, newCell.timestamp());
 +        index.reset();
 +
 +        // Modify the liveness of the primary key, the delta rows should contain
 +        // no cell data as only the pk was altered, but it should illustrate the
 +        // change to the liveness info
 +        execute("INSERT INTO %s(k, c) VALUES (0, 0) USING TIMESTAMP 4");
 +        assertEquals(1, index.rowsUpdated.size());
 +        oldRow = index.rowsUpdated.get(0).left;
 +        assertEquals(0, oldRow.size());
 +        assertEquals(0, oldRow.primaryKeyLivenessInfo().timestamp());
 +        newRow = index.rowsUpdated.get(0).right;
 +        assertEquals(0, newRow.size());
 +        assertEquals(4, newRow.primaryKeyLivenessInfo().timestamp());
 +    }
 +
++    @Test
+     public void testIndexQueriesWithIndexNotReady() throws Throwable
+     {
+         createTable("CREATE TABLE %s (pk int, ck int, value int, PRIMARY KEY (pk, ck))");
+ 
+         for (int i = 0; i < 10; i++)
+             for (int j = 0; j < 10; j++)
+                 execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", i, j, i + j);
+ 
 -        createIndex("CREATE CUSTOM INDEX testIndex ON %s (value) USING '" + IndexBlockingOnInitialization.class.getName()
 -                + "'");
++        createIndex("CREATE CUSTOM INDEX testIndex ON %s (value) USING '" + IndexBlockingOnInitialization.class.getName()
+ "'");
+         try
+         {
+             execute("SELECT value FROM %s WHERE value = 2");
+             fail();
+         }
+         catch (IndexNotAvailableException e)
+         {
+             assertTrue(true);
+         }
+         finally
+         {
+             execute("DROP index " + KEYSPACE + ".testIndex");
+         }
+     }
+ 
 -    /**
 -     * Custom index used to test the behavior of the system when the index is not ready.
 -     * As Custom indices cannot by <code>PerColumnSecondaryIndex</code> we use
a <code>PerRowSecondaryIndex</code>
 -     * to avoid the check but return a <code>CompositesSearcher</code>.
 -     */
 -    public static class IndexBlockingOnInitialization extends PerRowSecondaryIndex
 +    private void validateCell(Cell cell, ColumnDefinition def, ByteBuffer val, long timestamp)
      {
 -        private volatile CountDownLatch latch = new CountDownLatch(1);
 -
 -        @Override
 -        public void index(ByteBuffer rowKey, ColumnFamily cf)
 -        {
 -            try
 -            {
 -                latch.await();
 -            }
 -            catch (InterruptedException e)
 -            {
 -                Thread.interrupted();
 -            }
 -        }
 -
 -        @Override
 -        public void delete(DecoratedKey key, Group opGroup)
 -        {
 -        }
 -
 -        @Override
 -        public void init()
 -        {
 -        }
 +        assertNotNull(cell);
 +        assertEquals(0, def.type.compare(cell.value(), val));
 +        assertEquals(timestamp, cell.timestamp());
 +    }
  
 -        @Override
 -        public void reload()
 -        {
 -        }
 +    private static void assertColumnValue(int expected, String name, Row row, CFMetaData
cfm)
 +    {
 +        ColumnDefinition col = cfm.getColumnDefinition(new ColumnIdentifier(name, true));
 +        AbstractType<?> type = col.type;
 +        assertEquals(expected, type.compose(row.getCell(col).value()));
 +    }
+ 
 -        @Override
 -        public void validateOptions() throws ConfigurationException
 -        {
 -        }
++    /**
++     * <code>CassandraIndex</code> that blocks during the initialization.
++     */
++    public static class IndexBlockingOnInitialization extends CustomCassandraIndex
++    {
++        private final CountDownLatch latch = new CountDownLatch(1);
+ 
 -        @Override
 -        public String getIndexName()
++        public IndexBlockingOnInitialization(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+         {
 -            return "testIndex";
++            super(baseCfs, indexDef);
+         }
+ 
+         @Override
 -        protected SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer>
columns)
++        public Callable<?> getInitializationTask()
+         {
 -            return new CompositesSearcher(baseCfs.indexManager, columns)
 -            {
 -                @Override
 -                public boolean canHandleIndexClause(List<IndexExpression> clause)
 -                {
 -                    return true;
 -                }
 -
 -                @Override
 -                public void validate(IndexExpression indexExpression) throws InvalidRequestException
 -                {
 -                }
++            return () -> {
++                latch.await();
++                return null;
+             };
+         }
+ 
+         @Override
 -        public void forceBlockingFlush()
 -        {
 -        }
 -
 -        @Override
 -        public ColumnFamilyStore getIndexCfs()
 -        {
 -            return baseCfs;
 -        }
 -
 -        @Override
 -        public void removeIndex(ByteBuffer columnName)
++        public Callable<?> getInvalidateTask()
+         {
+             latch.countDown();
 -        }
 -
 -        @Override
 -        public void invalidate()
 -        {
 -        }
 -
 -        @Override
 -        public void truncateBlocking(long truncatedAt)
 -        {
 -        }
 -
 -        @Override
 -        public boolean indexes(CellName name)
 -        {
 -            return false;
 -        }
 -
 -        @Override
 -        public long estimateResultRows()
 -        {
 -            return 0;
++            return super.getInvalidateTask();
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
index 0957f74,0000000..3bce683
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@@ -1,642 -1,0 +1,642 @@@
 +package org.apache.cassandra.index.internal;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.Future;
 +import java.util.function.BiFunction;
 +import java.util.stream.Collectors;
 +import java.util.stream.StreamSupport;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.index.IndexRegistry;
 +import org.apache.cassandra.index.SecondaryIndexBuilder;
 +import org.apache.cassandra.index.transactions.IndexTransaction;
 +import org.apache.cassandra.index.transactions.UpdateTransaction;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.Refs;
 +
 +import static org.apache.cassandra.index.internal.CassandraIndex.getFunctions;
 +import static org.apache.cassandra.index.internal.CassandraIndex.indexCfsMetadata;
 +import static org.apache.cassandra.index.internal.CassandraIndex.parseTarget;
 +
 +/**
 + * Clone of KeysIndex used in CassandraIndexTest#testCustomIndexWithCFS to verify
 + * behaviour of flushing CFS backed CUSTOM indexes
 + */
 +public class CustomCassandraIndex implements Index
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class);
 +
 +    public final ColumnFamilyStore baseCfs;
 +    protected IndexMetadata metadata;
 +    protected ColumnFamilyStore indexCfs;
 +    protected ColumnDefinition indexedColumn;
 +    protected CassandraIndexFunctions functions;
 +
 +    public CustomCassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
 +    {
 +        this.baseCfs = baseCfs;
 +        setMetadata(indexDef);
 +    }
 +
 +    /**
 +     * Returns true if an index of this type can support search predicates of the form [column]
OPERATOR [value]
 +     * @param indexedColumn
 +     * @param operator
 +     * @return
 +     */
 +    protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
 +    {
 +        return operator.equals(Operator.EQ);
 +    }
 +
 +    public ColumnDefinition getIndexedColumn()
 +    {
 +        return indexedColumn;
 +    }
 +
 +    public ClusteringComparator getIndexComparator()
 +    {
 +        return indexCfs.metadata.comparator;
 +    }
 +
 +    public ColumnFamilyStore getIndexCfs()
 +    {
 +        return indexCfs;
 +    }
 +
 +    public void register(IndexRegistry registry)
 +    {
 +        registry.registerIndex(this);
 +    }
 +
 +    public Callable<?> getInitializationTask()
 +    {
 +        // if we're just linking in the index on an already-built index post-restart
-         // we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder
-         return isBuilt() ? null : getBuildIndexTask();
++        // or if the table is empty we've nothing to do. Otherwise, submit for building
via SecondaryIndexBuilder
++        return isBuilt() || baseCfs.isEmpty() ? null : getBuildIndexTask();
 +    }
 +
 +    public IndexMetadata getIndexMetadata()
 +    {
 +        return metadata;
 +    }
 +
 +    public Optional<ColumnFamilyStore> getBackingTable()
 +    {
 +        return indexCfs == null ? Optional.empty() : Optional.of(indexCfs);
 +    }
 +
 +    public Callable<Void> getBlockingFlushTask()
 +    {
 +        return () -> {
 +            indexCfs.forceBlockingFlush();
 +            return null;
 +        };
 +    }
 +
 +    public Callable<?> getInvalidateTask()
 +    {
 +        return () -> {
 +            invalidate();
 +            return null;
 +        };
 +    }
 +
 +    public Callable<?> getMetadataReloadTask(IndexMetadata indexDef)
 +    {
 +        setMetadata(indexDef);
 +        return () -> {
 +            indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
 +            indexCfs.reload();
 +            return null;
 +        };
 +    }
 +
 +    private void setMetadata(IndexMetadata indexDef)
 +    {
 +        metadata = indexDef;
 +        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata,
indexDef);
 +        functions = getFunctions(indexDef, target);
 +        CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
 +        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
 +                                                             cfm.cfName,
 +                                                             cfm,
 +                                                             baseCfs.getTracker().loadsstables);
 +        indexedColumn = target.left;
 +    }
 +
 +    public Callable<?> getTruncateTask(final long truncatedAt)
 +    {
 +        return () -> {
 +            indexCfs.discardSSTables(truncatedAt);
 +            return null;
 +        };
 +    }
 +
 +    public boolean shouldBuildBlocking()
 +    {
 +        return true;
 +    }
 +
 +    public boolean indexes(PartitionColumns columns)
 +    {
 +        // if we have indexes on the partition key or clustering columns, return true
 +        return isPrimaryKeyIndex() || columns.contains(indexedColumn);
 +    }
 +
 +    public boolean dependsOn(ColumnDefinition column)
 +    {
 +        return column.equals(indexedColumn);
 +    }
 +
 +    public boolean supportsExpression(ColumnDefinition column, Operator operator)
 +    {
 +        return indexedColumn.name.equals(column.name)
 +               && supportsOperator(indexedColumn, operator);
 +    }
 +
 +    public AbstractType<?> customExpressionValueType()
 +    {
 +        return null;
 +    }
 +
 +    private boolean supportsExpression(RowFilter.Expression expression)
 +    {
 +        return supportsExpression(expression.column(), expression.operator());
 +    }
 +
 +    public long getEstimatedResultRows()
 +    {
 +        return indexCfs.getMeanColumns();
 +    }
 +
 +    /**
 +     * No post processing of query results, just return them unchanged
 +     */
 +    public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand
command)
 +    {
 +        return (partitionIterator, readCommand) -> partitionIterator;
 +    }
 +
 +    public RowFilter getPostIndexQueryFilter(RowFilter filter)
 +    {
 +        return getTargetExpression(filter.getExpressions()).map(filter::without)
 +                                                           .orElse(filter);
 +    }
 +
 +    private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression>
expressions)
 +    {
 +        return expressions.stream().filter(this::supportsExpression).findFirst();
 +    }
 +
 +    public Index.Searcher searcherFor(ReadCommand command)
 +    {
 +        return null;
 +    }
 +
 +    public void validate(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        switch (indexedColumn.kind)
 +        {
 +            case PARTITION_KEY:
 +                validatePartitionKey(update.partitionKey());
 +                break;
 +            case CLUSTERING:
 +                validateClusterings(update);
 +                break;
 +            case REGULAR:
 +                validateRows(update);
 +                break;
 +            case STATIC:
 +                validateRows(Collections.singleton(update.staticRow()));
 +                break;
 +        }
 +    }
 +
 +    protected CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
 +                                               ClusteringPrefix prefix,
 +                                               CellPath path)
 +    {
 +        CBuilder builder = CBuilder.create(getIndexComparator());
 +        builder.add(partitionKey);
 +        return builder;
 +    }
 +
 +    protected ByteBuffer getIndexedValue(ByteBuffer partitionKey,
 +                                      Clustering clustering,
 +                                      CellPath path, ByteBuffer cellValue)
 +    {
 +        return cellValue;
 +    }
 +
 +    public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
 +    {
 +        throw new UnsupportedOperationException("KEYS indexes do not use a specialized index
entry format");
 +    }
 +
 +    public boolean isStale(Row row, ByteBuffer indexValue, int nowInSec)
 +    {
 +        if (row == null)
 +            return true;
 +
 +        Cell cell = row.getCell(indexedColumn);
 +
 +        return (cell == null
 +             || !cell.isLive(nowInSec)
 +             || indexedColumn.type.compare(indexValue, cell.value()) != 0);
 +    }
 +
 +    public Indexer indexerFor(final DecoratedKey key,
 +                              final int nowInSec,
 +                              final OpOrder.Group opGroup,
 +                              final IndexTransaction.Type transactionType)
 +    {
 +        return new Indexer()
 +        {
 +            public void begin()
 +            {
 +            }
 +
 +            public void partitionDelete(DeletionTime deletionTime)
 +            {
 +            }
 +
 +            public void rangeTombstone(RangeTombstone tombstone)
 +            {
 +            }
 +
 +            public void insertRow(Row row)
 +            {
 +                if (isPrimaryKeyIndex())
 +                {
 +                    indexPrimaryKey(row.clustering(),
 +                                    getPrimaryKeyIndexLiveness(row),
 +                                    row.deletion());
 +                }
 +                else
 +                {
 +                    if (indexedColumn.isComplex())
 +                        indexCells(row.clustering(), row.getComplexColumnData(indexedColumn));
 +                    else
 +                        indexCell(row.clustering(), row.getCell(indexedColumn));
 +                }
 +            }
 +
 +            public void removeRow(Row row)
 +            {
 +                if (isPrimaryKeyIndex())
 +                    indexPrimaryKey(row.clustering(), row.primaryKeyLivenessInfo(), row.deletion());
 +
 +                if (indexedColumn.isComplex())
 +                    removeCells(row.clustering(), row.getComplexColumnData(indexedColumn));
 +                else
 +                    removeCell(row.clustering(), row.getCell(indexedColumn));
 +            }
 +
 +
 +            public void updateRow(Row oldRow, Row newRow)
 +            {
 +                if (isPrimaryKeyIndex())
 +                    indexPrimaryKey(newRow.clustering(),
 +                                    newRow.primaryKeyLivenessInfo(),
 +                                    newRow.deletion());
 +
 +                if (indexedColumn.isComplex())
 +                {
 +                    indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn));
 +                    removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn));
 +                }
 +                else
 +                {
 +                    indexCell(newRow.clustering(), newRow.getCell(indexedColumn));
 +                    removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn));
 +                }
 +            }
 +
 +            public void finish()
 +            {
 +            }
 +
 +            private void indexCells(Clustering clustering, Iterable<Cell> cells)
 +            {
 +                if (cells == null)
 +                    return;
 +
 +                for (Cell cell : cells)
 +                    indexCell(clustering, cell);
 +            }
 +
 +            private void indexCell(Clustering clustering, Cell cell)
 +            {
 +                if (cell == null || !cell.isLive(nowInSec))
 +                    return;
 +
 +                insert(key.getKey(),
 +                       clustering,
 +                       cell,
 +                       LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()),
 +                       opGroup);
 +            }
 +
 +            private void removeCells(Clustering clustering, Iterable<Cell> cells)
 +            {
 +                if (cells == null)
 +                    return;
 +
 +                for (Cell cell : cells)
 +                    removeCell(clustering, cell);
 +            }
 +
 +            private void removeCell(Clustering clustering, Cell cell)
 +            {
 +                if (cell == null || !cell.isLive(nowInSec))
 +                    return;
 +
 +                delete(key.getKey(), clustering, cell, opGroup, nowInSec);
 +            }
 +
 +            private void indexPrimaryKey(final Clustering clustering,
 +                                         final LivenessInfo liveness,
 +                                         final Row.Deletion deletion)
 +            {
 +                if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
 +                    insert(key.getKey(), clustering, null, liveness, opGroup);
 +
 +                if (!deletion.isLive())
 +                    delete(key.getKey(), clustering, deletion.time(), opGroup);
 +            }
 +
 +            private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
 +            {
 +                long timestamp = row.primaryKeyLivenessInfo().timestamp();
 +                int ttl = row.primaryKeyLivenessInfo().ttl();
 +                for (Cell cell : row.cells())
 +                {
 +                    long cellTimestamp = cell.timestamp();
 +                    if (cell.isLive(nowInSec))
 +                    {
 +                        if (cellTimestamp > timestamp)
 +                        {
 +                            timestamp = cellTimestamp;
 +                            ttl = cell.ttl();
 +                        }
 +                    }
 +                }
 +                return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec);
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Specific to internal indexes, this is called by a
 +     * searcher when it encounters a stale entry in the index
 +     * @param indexKey the partition key in the index table
 +     * @param indexClustering the clustering in the index table
 +     * @param deletion deletion timestamp etc
 +     * @param opGroup the operation under which to perform the deletion
 +     */
 +    public void deleteStaleEntry(DecoratedKey indexKey,
 +                                 Clustering indexClustering,
 +                                 DeletionTime deletion,
 +                                 OpOrder.Group opGroup)
 +    {
 +        doDelete(indexKey, indexClustering, deletion, opGroup);
 +        logger.debug("Removed index entry for stale value {}", indexKey);
 +    }
 +
 +    /**
 +     * Called when adding a new entry to the index
 +     */
 +    private void insert(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        Cell cell,
 +                        LivenessInfo info,
 +                        OpOrder.Group opGroup)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               cell));
 +        Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell),
info);
 +        PartitionUpdate upd = partitionUpdate(valueKey, row);
 +        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
 +        logger.debug("Inserted entry into index for value {}", valueKey);
 +    }
 +
 +    /**
 +     * Called when deleting entries on non-primary key columns
 +     */
 +    private void delete(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        Cell cell,
 +                        OpOrder.Group opGroup,
 +                        int nowInSec)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               cell));
 +        doDelete(valueKey,
 +                 buildIndexClustering(rowKey, clustering, cell),
 +                 new DeletionTime(cell.timestamp(), nowInSec),
 +                 opGroup);
 +    }
 +
 +    /**
 +     * Called when deleting entries from indexes on primary key columns
 +     */
 +    private void delete(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        DeletionTime deletion,
 +                        OpOrder.Group opGroup)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               null));
 +        doDelete(valueKey,
 +                 buildIndexClustering(rowKey, clustering, null),
 +                 deletion,
 +                 opGroup);
 +    }
 +
 +    private void doDelete(DecoratedKey indexKey,
 +                          Clustering indexClustering,
 +                          DeletionTime deletion,
 +                          OpOrder.Group opGroup)
 +    {
 +        Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion));
 +        PartitionUpdate upd = partitionUpdate(indexKey, row);
 +        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
 +        logger.debug("Removed index entry for value {}", indexKey);
 +    }
 +
 +    private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException
 +    {
 +        assert indexedColumn.isPartitionKey();
 +        validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null ));
 +    }
 +
 +    private void validateClusterings(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        assert indexedColumn.isClusteringColumn();
 +        for (Row row : update)
 +            validateIndexedValue(getIndexedValue(null, row.clustering(), null));
 +    }
 +
 +    private void validateRows(Iterable<Row> rows)
 +    {
 +        assert !indexedColumn.isPrimaryKeyColumn();
 +        for (Row row : rows)
 +        {
 +            if (indexedColumn.isComplex())
 +            {
 +                ComplexColumnData data = row.getComplexColumnData(indexedColumn);
 +                if (data != null)
 +                {
 +                    for (Cell cell : data)
 +                    {
 +                        validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value()));
 +                    }
 +                }
 +            }
 +            else
 +            {
 +                validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn)));
 +            }
 +        }
 +    }
 +
 +    private void validateIndexedValue(ByteBuffer value)
 +    {
 +        if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
 +            throw new InvalidRequestException(String.format(
 +                                                           "Cannot index value of size %d
for index %s on %s.%s(%s) (maximum allowed size=%d)",
 +                                                           value.remaining(),
 +                                                           metadata.name,
 +                                                           baseCfs.metadata.ksName,
 +                                                           baseCfs.metadata.cfName,
 +                                                           indexedColumn.name.toString(),
 +                                                           FBUtilities.MAX_UNSIGNED_SHORT));
 +    }
 +
 +    private ByteBuffer getIndexedValue(ByteBuffer rowKey,
 +                                       Clustering clustering,
 +                                       Cell cell)
 +    {
 +        return getIndexedValue(rowKey,
 +                               clustering,
 +                               cell == null ? null : cell.path(),
 +                               cell == null ? null : cell.value()
 +        );
 +    }
 +
 +    private Clustering buildIndexClustering(ByteBuffer rowKey,
 +                                            Clustering clustering,
 +                                            Cell cell)
 +    {
 +        return buildIndexClusteringPrefix(rowKey,
 +                                          clustering,
 +                                          cell == null ? null : cell.path()).build();
 +    }
 +
 +    private DecoratedKey getIndexKeyFor(ByteBuffer value)
 +    {
 +        return indexCfs.decorateKey(value);
 +    }
 +
 +    private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row)
 +    {
 +        return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
 +    }
 +
 +    private void invalidate()
 +    {
 +        // interrupt in-progress compactions
 +        Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
 +        CompactionManager.instance.interruptCompactionForCFs(cfss, true);
 +        CompactionManager.instance.waitForCessation(cfss);
 +        indexCfs.keyspace.writeOrder.awaitNewBarrier();
 +        indexCfs.forceBlockingFlush();
 +        indexCfs.readOrdering.awaitNewBarrier();
 +        indexCfs.invalidate();
 +    }
 +
 +    private boolean isBuilt()
 +    {
 +        return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), metadata.name);
 +    }
 +
 +    private boolean isPrimaryKeyIndex()
 +    {
 +        return indexedColumn.isPrimaryKeyColumn();
 +    }
 +
 +    private Callable<?> getBuildIndexTask()
 +    {
 +        return () -> {
 +            buildBlocking();
 +            return null;
 +        };
 +    }
 +
 +    private void buildBlocking()
 +    {
 +        baseCfs.forceBlockingFlush();
 +
 +        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
 +             Refs<SSTableReader> sstables = viewFragment.refs)
 +        {
 +            if (sstables.isEmpty())
 +            {
 +                logger.info("No SSTable data for {}.{} to build index {} from, marking empty
index as built",
 +                            baseCfs.metadata.ksName,
 +                            baseCfs.metadata.cfName,
 +                            metadata.name);
 +                baseCfs.indexManager.markIndexBuilt(metadata.name);
 +                return;
 +            }
 +
 +            logger.info("Submitting index build of {} for data in {}",
 +                        metadata.name,
 +                        getSSTableNames(sstables));
 +
 +            SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
 +                                                                      Collections.singleton(this),
 +                                                                      new ReducingKeyIterator(sstables));
 +            Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
 +            FBUtilities.waitOnFuture(future);
 +            indexCfs.forceBlockingFlush();
 +            baseCfs.indexManager.markIndexBuilt(metadata.name);
 +        }
 +        logger.info("Index build of {} complete", metadata.name);
 +    }
 +
 +    private static String getSSTableNames(Collection<SSTableReader> sstables)
 +    {
 +        return StreamSupport.stream(sstables.spliterator(), false)
 +                            .map(SSTableReader::toString)
 +                            .collect(Collectors.joining(", "));
 +    }
 +}


Mime
View raw message