cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From c...@apache.org
Subject [11/20] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0
Date Wed, 15 Jun 2016 14:59:44 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/CustomIndexTest.java
index fa6700b,0000000..f02823c
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@@ -1,806 -1,0 +1,826 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.index;
 +
 +import java.util.*;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.ImmutableMap;
 +import org.junit.Test;
 +
 +import com.datastax.driver.core.exceptions.QueryValidationException;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.cql3.CQLTester;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.restrictions.IndexRestrictions;
 +import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.cql3.statements.ModificationStatement;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.ReadOrderGroup;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.Int32Type;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.Indexes;
 +import org.apache.cassandra.transport.Server;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.apache.cassandra.Util.throwAssert;
 +import static org.apache.cassandra.cql3.statements.IndexTarget.CUSTOM_INDEX_OPTION_NAME;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertTrue;
 +import static org.junit.Assert.fail;
 +
 +public class CustomIndexTest extends CQLTester
 +{
 +    @Test
 +    public void testInsertsOnCfsBackedIndex() throws Throwable
 +    {
 +        // test to ensure that we don't deadlock when flushing CFS backed custom indexers
 +        // see CASSANDRA-10181
 +        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
 +        createIndex("CREATE CUSTOM INDEX myindex ON %s(c) USING 'org.apache.cassandra.index.internal.CustomCassandraIndex'");
 +
 +        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 2);
 +        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 0, 1);
 +        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 2, 0, 0);
 +    }
 +
 +    @Test
 +    public void testTruncateWithNonCfsCustomIndex() throws Throwable
 +    {
 +        // deadlocks and times out the test in the face of the synchronisation
 +        // issues described in the comments on CASSANDRA-9669
 +        createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a))");
 +        createIndex("CREATE CUSTOM INDEX b_index ON %s(b) USING 'org.apache.cassandra.index.StubIndex'");
 +        execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 0, 1, 2);
 +        getCurrentColumnFamilyStore().truncateBlocking();
 +    }
 +
 +    @Test
 +    public void indexControlsIfIncludedInBuildOnNewSSTables() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (a int, b int, PRIMARY KEY (a))");
 +        String toInclude = "include";
 +        String toExclude = "exclude";
 +        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(b) USING '%s'",
 +                                  toInclude, IndexIncludedInBuild.class.getName()));
 +        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(b) USING '%s'",
 +                                  toExclude, IndexExcludedFromBuild.class.getName()));
 +
 +        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 0, 0);
 +        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 1, 1);
 +        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 2, 2);
 +        flush();
 +
 +        SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager;
 +        IndexIncludedInBuild included = (IndexIncludedInBuild)indexManager.getIndexByName(toInclude);
 +        included.reset();
 +        assertTrue(included.rowsInserted.isEmpty());
 +
 +        IndexExcludedFromBuild excluded = (IndexExcludedFromBuild)indexManager.getIndexByName(toExclude);
 +        excluded.reset();
 +        assertTrue(excluded.rowsInserted.isEmpty());
 +
 +        indexManager.buildAllIndexesBlocking(getCurrentColumnFamilyStore().getLiveSSTables());
 +
 +        assertEquals(3, included.rowsInserted.size());
 +        assertTrue(excluded.rowsInserted.isEmpty());
 +    }
 +
 +    @Test
 +    public void indexReceivesWriteTimeDeletionsCorrectly() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b, c))");
 +        String indexName = "test_index";
 +        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(d) USING '%s'",
 +                                  indexName, StubIndex.class.getName()));
 +
 +        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 0);
 +        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 1, 1);
 +        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 2, 2);
 +        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 3, 3);
 +
 +        SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager;
 +        StubIndex index = (StubIndex)indexManager.getIndexByName(indexName);
 +        assertEquals(4, index.rowsInserted.size());
 +        assertTrue(index.partitionDeletions.isEmpty());
 +        assertTrue(index.rangeTombstones.isEmpty());
 +
 +        execute("DELETE FROM %s WHERE a=0 AND b=0");
 +        assertTrue(index.partitionDeletions.isEmpty());
 +        assertEquals(1, index.rangeTombstones.size());
 +
 +        execute("DELETE FROM %s WHERE a=0");
 +        assertEquals(1, index.partitionDeletions.size());
 +        assertEquals(1, index.rangeTombstones.size());
 +    }
 +    @Test
 +    public void nonCustomIndexesRequireExactlyOneTargetColumn() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY (k,c))");
 +
 +        assertInvalidMessage("Only CUSTOM indexes support multiple columns", "CREATE INDEX multi_idx on %s(v1,v2)");
 +        assertInvalidMessage("Only CUSTOM indexes can be created without specifying a target column",
 +                           "CREATE INDEX no_targets on %s()");
 +
 +        createIndex(String.format("CREATE CUSTOM INDEX multi_idx ON %%s(v1, v2) USING '%s'", StubIndex.class.getName()));
 +        assertIndexCreated("multi_idx", "v1", "v2");
 +    }
 +
 +    @Test
 +    public void rejectDuplicateColumnsInTargetList() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY (k,c))");
 +
 +        assertInvalidMessage("Duplicate column v1 in index target list",
 +                             String.format("CREATE CUSTOM INDEX ON %%s(v1, v1) USING '%s'",
 +                                           StubIndex.class.getName()));
 +
 +        assertInvalidMessage("Duplicate column v1 in index target list",
 +                             String.format("CREATE CUSTOM INDEX ON %%s(v1, v1, c, c) USING '%s'",
 +                                           StubIndex.class.getName()));
 +    }
 +
 +    @Test
 +    public void requireFullQualifierForFrozenCollectionTargets() throws Throwable
 +    {
 +        // this is really just to prove that we require the full modifier on frozen collection
 +        // targets whether the index is multicolumn or not
 +        createTable("CREATE TABLE %s(" +
 +                    " k int," +
 +                    " c int," +
 +                    " fmap frozen<map<int, text>>," +
 +                    " flist frozen<list<int>>," +
 +                    " fset frozen<set<int>>," +
 +                    " PRIMARY KEY(k,c))");
 +
 +        assertInvalidMessage("Cannot create keys() index on frozen column fmap. " +
 +                             "Frozen collections only support full() indexes",
 +                             String.format("CREATE CUSTOM INDEX ON %%s(c, keys(fmap)) USING'%s'",
 +                                           StubIndex.class.getName()));
 +        assertInvalidMessage("Cannot create entries() index on frozen column fmap. " +
 +                             "Frozen collections only support full() indexes",
 +                             String.format("CREATE CUSTOM INDEX ON %%s(c, entries(fmap)) USING'%s'",
 +                                           StubIndex.class.getName()));
 +        assertInvalidMessage("Cannot create values() index on frozen column fmap. " +
 +                             "Frozen collections only support full() indexes",
 +                             String.format("CREATE CUSTOM INDEX ON %%s(c, fmap) USING'%s'", StubIndex.class.getName()));
 +
 +        assertInvalidMessage("Cannot create keys() index on frozen column flist. " +
 +                             "Frozen collections only support full() indexes",
 +                             String.format("CREATE CUSTOM INDEX ON %%s(c, keys(flist)) USING'%s'",
 +                                           StubIndex.class.getName()));
 +        assertInvalidMessage("Cannot create entries() index on frozen column flist. " +
 +                             "Frozen collections only support full() indexes",
 +                             String.format("CREATE CUSTOM INDEX ON %%s(c, entries(flist)) USING'%s'",
 +                                           StubIndex.class.getName()));
 +        assertInvalidMessage("Cannot create values() index on frozen column flist. " +
 +                             "Frozen collections only support full() indexes",
 +                             String.format("CREATE CUSTOM INDEX ON %%s(c, flist) USING'%s'", StubIndex.class.getName()));
 +
 +        assertInvalidMessage("Cannot create keys() index on frozen column fset. " +
 +                             "Frozen collections only support full() indexes",
 +                             String.format("CREATE CUSTOM INDEX ON %%s(c, keys(fset)) USING'%s'",
 +                                           StubIndex.class.getName()));
 +        assertInvalidMessage("Cannot create entries() index on frozen column fset. " +
 +                             "Frozen collections only support full() indexes",
 +                             String.format("CREATE CUSTOM INDEX ON %%s(c, entries(fset)) USING'%s'",
 +                                           StubIndex.class.getName()));
 +        assertInvalidMessage("Cannot create values() index on frozen column fset. " +
 +                             "Frozen collections only support full() indexes",
 +                             String.format("CREATE CUSTOM INDEX ON %%s(c, fset) USING'%s'", StubIndex.class.getName()));
 +
 +        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, full(fmap)) USING'%s'", StubIndex.class.getName()));
 +        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, full(flist)) USING'%s'", StubIndex.class.getName()));
 +        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, full(fset)) USING'%s'", StubIndex.class.getName()));
 +    }
 +
 +    @Test
 +    public void defaultIndexNameContainsTargetColumns() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY(k,c))");
 +
 +        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(v1, v2) USING '%s'", StubIndex.class.getName()));
 +        assertEquals(1, getCurrentColumnFamilyStore().metadata.getIndexes().size());
 +        assertIndexCreated(currentTable() + "_idx", "v1", "v2");
 +
 +        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v1, v2) USING '%s'", StubIndex.class.getName()));
 +        assertEquals(2, getCurrentColumnFamilyStore().metadata.getIndexes().size());
 +        assertIndexCreated(currentTable() + "_idx_1", "c", "v1", "v2");
 +
 +        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v2) USING '%s'", StubIndex.class.getName()));
 +        assertEquals(3, getCurrentColumnFamilyStore().metadata.getIndexes().size());
 +        assertIndexCreated(currentTable() + "_idx_2", "c", "v2");
 +
 +        // duplicate the previous index with some additional options and check the name is generated as expected
 +        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v2) USING '%s' WITH OPTIONS = {'foo':'bar'}",
 +                                  StubIndex.class.getName()));
 +        assertEquals(4, getCurrentColumnFamilyStore().metadata.getIndexes().size());
 +        Map<String, String> options = new HashMap<>();
 +        options.put("foo", "bar");
 +        assertIndexCreated(currentTable() + "_idx_3", options, "c", "v2");
 +    }
 +
 +    @Test
 +    public void createMultiColumnIndexes() throws Throwable
 +    {
 +        // smoke test for various permutations of multicolumn indexes
 +        createTable("CREATE TABLE %s (" +
 +                    " pk1 int," +
 +                    " pk2 int," +
 +                    " c1 int," +
 +                    " c2 int," +
 +                    " v1 int," +
 +                    " v2 int," +
 +                    " mval map<text, int>," +
 +                    " lval list<int>," +
 +                    " sval set<int>," +
 +                    " fmap frozen<map<text,int>>," +
 +                    " flist frozen<list<int>>," +
 +                    " fset frozen<set<int>>," +
 +                    " PRIMARY KEY ((pk1, pk2), c1, c2))");
 +
 +        testCreateIndex("idx_1", "pk1", "pk2");
 +        testCreateIndex("idx_2", "pk1", "c1");
 +        testCreateIndex("idx_3", "pk1", "c2");
 +        testCreateIndex("idx_4", "c1", "c2");
 +        testCreateIndex("idx_5", "c2", "v1");
 +        testCreateIndex("idx_6", "v1", "v2");
 +        testCreateIndex("idx_7", "pk2", "c2", "v2");
 +        testCreateIndex("idx_8", "pk1", "c1", "v1", "mval", "sval", "lval");
 +
 +        createIndex(String.format("CREATE CUSTOM INDEX inc_frozen ON %%s(" +
 +                                  "  pk2, c2, v2, full(fmap), full(fset), full(flist)" +
 +                                  ") USING '%s'",
 +                                  StubIndex.class.getName()));
 +        assertIndexCreated("inc_frozen",
 +                           new HashMap<>(),
 +                           ImmutableList.of(indexTarget("pk2", IndexTarget.Type.VALUES),
 +                                            indexTarget("c2", IndexTarget.Type.VALUES),
 +                                            indexTarget("v2", IndexTarget.Type.VALUES),
 +                                            indexTarget("fmap", IndexTarget.Type.FULL),
 +                                            indexTarget("fset", IndexTarget.Type.FULL),
 +                                            indexTarget("flist", IndexTarget.Type.FULL)));
 +
 +        createIndex(String.format("CREATE CUSTOM INDEX all_teh_things ON %%s(" +
 +                                  "  pk1, pk2, c1, c2, v1, v2, keys(mval), lval, sval, full(fmap), full(fset), full(flist)" +
 +                                  ") USING '%s'",
 +                                  StubIndex.class.getName()));
 +        assertIndexCreated("all_teh_things",
 +                           new HashMap<>(),
 +                           ImmutableList.of(indexTarget("pk1", IndexTarget.Type.VALUES),
 +                                            indexTarget("pk2", IndexTarget.Type.VALUES),
 +                                            indexTarget("c1", IndexTarget.Type.VALUES),
 +                                            indexTarget("c2", IndexTarget.Type.VALUES),
 +                                            indexTarget("v1", IndexTarget.Type.VALUES),
 +                                            indexTarget("v2", IndexTarget.Type.VALUES),
 +                                            indexTarget("mval", IndexTarget.Type.KEYS),
 +                                            indexTarget("lval", IndexTarget.Type.VALUES),
 +                                            indexTarget("sval", IndexTarget.Type.VALUES),
 +                                            indexTarget("fmap", IndexTarget.Type.FULL),
 +                                            indexTarget("fset", IndexTarget.Type.FULL),
 +                                            indexTarget("flist", IndexTarget.Type.FULL)));
 +    }
 +
 +    @Test
 +    public void createMultiColumnIndexIncludingUserTypeColumn() throws Throwable
 +    {
 +        String myType = KEYSPACE + '.' + createType("CREATE TYPE %s (a int, b int)");
 +        createTable("CREATE TABLE %s (k int PRIMARY KEY, v1 int, v2 frozen<" + myType + ">)");
 +        testCreateIndex("udt_idx", "v1", "v2");
 +        Indexes indexes = getCurrentColumnFamilyStore().metadata.getIndexes();
 +        IndexMetadata expected = IndexMetadata.fromIndexTargets(getCurrentColumnFamilyStore().metadata,
 +                                                                ImmutableList.of(indexTarget("v1", IndexTarget.Type.VALUES),
 +                                                                                 indexTarget("v2", IndexTarget.Type.VALUES)),
 +                                                                "udt_idx",
 +                                                                IndexMetadata.Kind.CUSTOM,
 +                                                                ImmutableMap.of(CUSTOM_INDEX_OPTION_NAME,
 +                                                                                StubIndex.class.getName()));
 +        IndexMetadata actual = indexes.get("udt_idx").orElseThrow(throwAssert("Index udt_idx not found"));
 +        assertEquals(expected, actual);
 +    }
 +
 +    @Test
 +    public void createIndexWithoutTargets() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY(k,c))");
 +        // only allowed for CUSTOM indexes
 +        assertInvalidMessage("Only CUSTOM indexes can be created without specifying a target column",
 +                             "CREATE INDEX ON %s()");
 +
 +        // parentheses are mandatory
 +        assertInvalidSyntax("CREATE CUSTOM INDEX ON %%s USING '%s'", StubIndex.class.getName());
 +        createIndex(String.format("CREATE CUSTOM INDEX no_targets ON %%s() USING '%s'", StubIndex.class.getName()));
 +        assertIndexCreated("no_targets", new HashMap<>());
 +    }
 +
 +    @Test
 +    public void testCustomIndexExpressionSyntax() throws Throwable
 +    {
 +        Object[] row = row(0, 0, 0, 0);
 +        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
 +        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", row);
 +
 +        assertInvalidMessage(String.format(IndexRestrictions.INDEX_NOT_FOUND, "custom_index", keyspace(), currentTable()),
 +                             "SELECT * FROM %s WHERE expr(custom_index, 'foo bar baz')");
 +
 +        createIndex(String.format("CREATE CUSTOM INDEX custom_index ON %%s(c) USING '%s'", StubIndex.class.getName()));
 +
 +        assertInvalidThrowMessage(Server.CURRENT_VERSION,
 +                                  String.format(IndexRestrictions.INDEX_NOT_FOUND, "no_such_index", keyspace(), currentTable()),
 +                                  QueryValidationException.class,
 +                                  "SELECT * FROM %s WHERE expr(no_such_index, 'foo bar baz ')");
 +
 +        // simple case
 +        assertRows(execute("SELECT * FROM %s WHERE expr(custom_index, 'foo bar baz')"), row);
 +        assertRows(execute("SELECT * FROM %s WHERE expr(\"custom_index\", 'foo bar baz')"), row);
 +        assertRows(execute("SELECT * FROM %s WHERE expr(custom_index, $$foo \" ~~~ bar Baz$$)"), row);
 +
 +        // multiple expressions on the same index
 +        assertInvalidThrowMessage(Server.CURRENT_VERSION,
 +                                  IndexRestrictions.MULTIPLE_EXPRESSIONS,
 +                                  QueryValidationException.class,
 +                                  "SELECT * FROM %s WHERE expr(custom_index, 'foo') AND expr(custom_index, 'bar')");
 +
 +        // multiple expressions on different indexes
 +        createIndex(String.format("CREATE CUSTOM INDEX other_custom_index ON %%s(d) USING '%s'", StubIndex.class.getName()));
 +        assertInvalidThrowMessage(Server.CURRENT_VERSION,
 +                                  IndexRestrictions.MULTIPLE_EXPRESSIONS,
 +                                  QueryValidationException.class,
 +                                  "SELECT * FROM %s WHERE expr(custom_index, 'foo') AND expr(other_custom_index, 'bar')");
 +
 +        assertInvalidThrowMessage(Server.CURRENT_VERSION,
 +                                  StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE,
 +                                  QueryValidationException.class,
 +                                  "SELECT * FROM %s WHERE expr(custom_index, 'foo') AND d=0");
 +        assertRows(execute("SELECT * FROM %s WHERE expr(custom_index, 'foo') AND d=0 ALLOW FILTERING"), row);
 +    }
 +
 +    @Test
 +    public void customIndexDoesntSupportCustomExpressions() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
 +        createIndex(String.format("CREATE CUSTOM INDEX custom_index ON %%s(c) USING '%s'",
 +                                  NoCustomExpressionsIndex.class.getName()));
 +        assertInvalidThrowMessage(Server.CURRENT_VERSION,
 +                                  String.format( IndexRestrictions.CUSTOM_EXPRESSION_NOT_SUPPORTED, "custom_index"),
 +                                  QueryValidationException.class,
 +                                  "SELECT * FROM %s WHERE expr(custom_index, 'foo bar baz')");
 +    }
 +
 +    @Test
 +    public void customIndexRejectsExpressionSyntax() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
 +        createIndex(String.format("CREATE CUSTOM INDEX custom_index ON %%s(c) USING '%s'",
 +                                  AlwaysRejectIndex.class.getName()));
 +        assertInvalidThrowMessage(Server.CURRENT_VERSION,
 +                                  "None shall pass",
 +                                  QueryValidationException.class,
 +                                  "SELECT * FROM %s WHERE expr(custom_index, 'foo bar baz')");
 +    }
 +
 +    @Test
 +    public void customExpressionsMustTargetCustomIndex() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
 +        createIndex("CREATE INDEX non_custom_index ON %s(c)");
 +        assertInvalidThrowMessage(Server.CURRENT_VERSION,
 +                                  String.format(IndexRestrictions.NON_CUSTOM_INDEX_IN_EXPRESSION, "non_custom_index"),
 +                                  QueryValidationException.class,
 +                                  "SELECT * FROM %s WHERE expr(non_custom_index, 'c=0')");
 +    }
 +
 +    @Test
 +    public void customExpressionsDisallowedInModifications() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
 +        String indexName = currentTable() + "_custom_index";
 +        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(c) USING '%s'",
 +                                  indexName, StubIndex.class.getName()));
 +
 +        assertInvalidThrowMessage(Server.CURRENT_VERSION,
 +                                  ModificationStatement.CUSTOM_EXPRESSIONS_NOT_ALLOWED,
 +                                  QueryValidationException.class,
 +                                  String.format("DELETE FROM %%s WHERE expr(%s, 'foo bar baz ')", indexName));
 +        assertInvalidThrowMessage(Server.CURRENT_VERSION,
 +                                  ModificationStatement.CUSTOM_EXPRESSIONS_NOT_ALLOWED,
 +                                  QueryValidationException.class,
 +                                  String.format("UPDATE %%s SET d=0 WHERE expr(%s, 'foo bar baz ')", indexName));
 +    }
 +
 +    @Test
 +    public void indexSelectionPrefersMostSelectiveIndex() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s(a int, b int, c int, PRIMARY KEY (a))");
 +        createIndex(String.format("CREATE CUSTOM INDEX %s_more_selective ON %%s(b) USING '%s'",
 +                                  currentTable(),
 +                                  SettableSelectivityIndex.class.getName()));
 +        createIndex(String.format("CREATE CUSTOM INDEX %s_less_selective ON %%s(c) USING '%s'",
 +                                  currentTable(),
 +                                  SettableSelectivityIndex.class.getName()));
 +        SettableSelectivityIndex moreSelective =
 +            (SettableSelectivityIndex)getCurrentColumnFamilyStore().indexManager.getIndexByName(currentTable() + "_more_selective");
 +        SettableSelectivityIndex lessSelective =
 +            (SettableSelectivityIndex)getCurrentColumnFamilyStore().indexManager.getIndexByName(currentTable() + "_less_selective");
 +        assertEquals(0, moreSelective.searchersProvided);
 +        assertEquals(0, lessSelective.searchersProvided);
 +
 +        // the more selective index should be chosen
 +        moreSelective.setEstimatedResultRows(1);
 +        lessSelective.setEstimatedResultRows(1000);
 +        execute("SELECT * FROM %s WHERE b=0 AND c=0 ALLOW FILTERING");
 +        assertEquals(1, moreSelective.searchersProvided);
 +        assertEquals(0, lessSelective.searchersProvided);
 +
 +        // and adjusting the selectivity should have an observable effect
 +        moreSelective.setEstimatedResultRows(10000);
 +        execute("SELECT * FROM %s WHERE b=0 AND c=0 ALLOW FILTERING");
 +        assertEquals(1, moreSelective.searchersProvided);
 +        assertEquals(1, lessSelective.searchersProvided);
 +    }
 +
 +    @Test
 +    public void customExpressionForcesIndexSelection() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s(a int, b int, c int, PRIMARY KEY (a))");
 +        createIndex(String.format("CREATE CUSTOM INDEX %s_more_selective ON %%s(b) USING '%s'",
 +                                  currentTable(),
 +                                  SettableSelectivityIndex.class.getName()));
 +        createIndex(String.format("CREATE CUSTOM INDEX %s_less_selective ON %%s(c) USING '%s'",
 +                                  currentTable(),
 +                                  SettableSelectivityIndex.class.getName()));
 +        SettableSelectivityIndex moreSelective =
 +            (SettableSelectivityIndex)getCurrentColumnFamilyStore().indexManager.getIndexByName(currentTable() + "_more_selective");
 +        SettableSelectivityIndex lessSelective =
 +            (SettableSelectivityIndex)getCurrentColumnFamilyStore().indexManager.getIndexByName(currentTable() + "_less_selective");
 +        assertEquals(0, moreSelective.searchersProvided);
 +        assertEquals(0, lessSelective.searchersProvided);
 +
 +        // without a custom expression, the more selective index should be chosen
 +        moreSelective.setEstimatedResultRows(1);
 +        lessSelective.setEstimatedResultRows(1000);
 +        execute("SELECT * FROM %s WHERE b=0 AND c=0 ALLOW FILTERING");
 +        assertEquals(1, moreSelective.searchersProvided);
 +        assertEquals(0, lessSelective.searchersProvided);
 +
 +        // when a custom expression is present, its target index should be preferred
 +        execute(String.format("SELECT * FROM %%s WHERE b=0 AND expr(%s_less_selective, 'expression') ALLOW FILTERING", currentTable()));
 +        assertEquals(1, moreSelective.searchersProvided);
 +        assertEquals(1, lessSelective.searchersProvided);
 +    }
 +
 +    @Test
 +    public void testCustomExpressionValueType() throws Throwable
 +    {
 +        // verify that the type of the expression value is determined by Index::customExpressionValueType
 +        createTable("CREATE TABLE %s (k int, v1 uuid, v2 blob, PRIMARY KEY(k))");
 +        createIndex(String.format("CREATE CUSTOM INDEX int_index ON %%s() USING '%s'",
 +                                  Int32ExpressionIndex.class.getName()));
 +        createIndex(String.format("CREATE CUSTOM INDEX text_index ON %%s() USING '%s'",
 +                                  UTF8ExpressionIndex.class.getName()));
 +
 +        execute("SELECT * FROM %s WHERE expr(text_index, 'foo')");
 +        assertInvalidThrowMessage(Server.CURRENT_VERSION,
 +                                  "Invalid INTEGER constant (99) for \"custom index expression\" of type text",
 +                                  QueryValidationException.class,
 +                                  "SELECT * FROM %s WHERE expr(text_index, 99)");
 +
 +        execute("SELECT * FROM %s WHERE expr(int_index, 99)");
 +        assertInvalidThrowMessage(Server.CURRENT_VERSION,
 +                                  "Invalid STRING constant (foo) for \"custom index expression\" of type int",
 +                                  QueryValidationException.class,
 +                                  "SELECT * FROM %s WHERE expr(int_index, 'foo')");
 +    }
 +
 +    @Test
 +    public void reloadIndexMetadataOnBaseCfsReload() throws Throwable
 +    {
 +        // verify that whenever the base table CFMetadata is reloaded, a reload of the index
 +        // metadata is performed
 +        createTable("CREATE TABLE %s (k int, v1 int, PRIMARY KEY(k))");
 +        createIndex(String.format("CREATE CUSTOM INDEX reload_counter ON %%s() USING '%s'",
 +                                  CountMetadataReloadsIndex.class.getName()));
 +        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
 +        CountMetadataReloadsIndex index = (CountMetadataReloadsIndex)cfs.indexManager.getIndexByName("reload_counter");
 +        assertEquals(0, index.reloads.get());
 +
 +        // reloading the CFS, even without any metadata changes invokes the index's metadata reload task
 +        cfs.reload();
 +        assertEquals(1, index.reloads.get());
 +    }
 +
 +    @Test
 +    public void notifyIndexersOfPartitionAndRowRemovalDuringCleanup() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k,c))");
 +        createIndex(String.format("CREATE CUSTOM INDEX cleanup_index ON %%s() USING '%s'", StubIndex.class.getName()));
 +        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
 +        StubIndex index  = (StubIndex)cfs.indexManager.getIndexByName("cleanup_index");
 +
 +        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 0, 0, 0);
 +        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 0, 1, 1);
 +        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 0, 2, 2);
 +        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 3, 3, 3);
 +        assertEquals(4, index.rowsInserted.size());
 +        assertEquals(0, index.partitionDeletions.size());
 +
 +        ReadCommand cmd = Util.cmd(cfs, 0).build();
 +        try (ReadOrderGroup orderGroup = cmd.startOrderGroup();
 +             UnfilteredPartitionIterator iterator = cmd.executeLocally(orderGroup))
 +        {
 +            assertTrue(iterator.hasNext());
 +            cfs.indexManager.deletePartition(iterator.next(), FBUtilities.nowInSeconds());
 +        }
 +
 +        assertEquals(1, index.partitionDeletions.size());
 +        assertEquals(3, index.rowsDeleted.size());
 +        for (int i = 0; i < 3; i++)
 +            assertEquals(index.rowsDeleted.get(i).clustering(), index.rowsInserted.get(i).clustering());
 +    }
 +
 +    @Test
 +    public void notifyIndexersOfExpiredRowsDuringCompaction() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (k int, c int, PRIMARY KEY (k,c))");
 +        createIndex(String.format("CREATE CUSTOM INDEX row_ttl_test_index ON %%s() USING '%s'", StubIndex.class.getName()));
 +        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
 +        StubIndex index  = (StubIndex)cfs.indexManager.getIndexByName("row_ttl_test_index");
 +
 +        execute("INSERT INTO %s (k, c) VALUES (?, ?) USING TTL 1", 0, 0);
 +        execute("INSERT INTO %s (k, c) VALUES (?, ?)", 0, 1);
 +        execute("INSERT INTO %s (k, c) VALUES (?, ?)", 0, 2);
 +        execute("INSERT INTO %s (k, c) VALUES (?, ?)", 3, 3);
 +        assertEquals(4, index.rowsInserted.size());
 +        // flush so that we end up with an expiring row in the first sstable
 +        flush();
 +
 +        // let the row with the ttl expire, then force a compaction
 +        TimeUnit.SECONDS.sleep(2);
 +        compact();
 +
 +        // the index should have been notified of the expired row
 +        assertEquals(1, index.rowsDeleted.size());
 +        Integer deletedClustering = Int32Type.instance.compose(index.rowsDeleted.get(0).clustering().get(0));
 +        assertEquals(0, deletedClustering.intValue());
 +    }
 +
 +    @Test
 +    public void validateOptions() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY(k,c))");
 +        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v2) USING '%s' WITH OPTIONS = {'foo':'bar'}",
 +                                  IndexWithValidateOptions.class.getName()));
 +        assertNotNull(IndexWithValidateOptions.options);
 +        assertEquals("bar", IndexWithValidateOptions.options.get("foo"));
 +    }
 +
 +    @Test
 +    public void validateOptionsWithCFMetaData() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY(k,c))");
 +        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v2) USING '%s' WITH OPTIONS = {'foo':'bar'}",
 +                                  IndexWithOverloadedValidateOptions.class.getName()));
 +        CFMetaData cfm = getCurrentColumnFamilyStore().metadata;
 +        assertEquals(cfm, IndexWithOverloadedValidateOptions.cfm);
 +        assertNotNull(IndexWithOverloadedValidateOptions.options);
 +        assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo"));
 +    }
 +
 +    private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
 +    {
 +        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",
 +                                  indexName,
 +                                  Arrays.stream(targetColumnNames).collect(Collectors.joining(",")),
 +                                  StubIndex.class.getName()));
 +        assertIndexCreated(indexName, targetColumnNames);
 +    }
 +
 +    private void assertIndexCreated(String name, String... targetColumnNames)
 +    {
 +        assertIndexCreated(name, new HashMap<>(), targetColumnNames);
 +    }
 +
 +    private void assertIndexCreated(String name, Map<String, String> options, String... targetColumnNames)
 +    {
 +        List<IndexTarget> targets = Arrays.stream(targetColumnNames)
 +                                          .map(s -> new IndexTarget(ColumnIdentifier.getInterned(s, true),
 +                                                                    IndexTarget.Type.VALUES))
 +                                          .collect(Collectors.toList());
 +        assertIndexCreated(name, options, targets);
 +    }
 +
 +    private void assertIndexCreated(String name, Map<String, String> options, List<IndexTarget> targets)
 +    {
 +        // all tests here use StubIndex as the custom index class,
 +        // so add that to the map of options
 +        options.put(CUSTOM_INDEX_OPTION_NAME, StubIndex.class.getName());
 +        CFMetaData cfm = getCurrentColumnFamilyStore().metadata;
 +        IndexMetadata expected = IndexMetadata.fromIndexTargets(cfm, targets, name, IndexMetadata.Kind.CUSTOM, options);
 +        Indexes indexes = getCurrentColumnFamilyStore().metadata.getIndexes();
 +        for (IndexMetadata actual : indexes)
 +            if (actual.equals(expected))
 +                return;
 +
 +        fail(String.format("Index %s not found in CFMetaData", expected));
 +    }
 +
 +    private static IndexTarget indexTarget(String name, IndexTarget.Type type)
 +    {
 +        return new IndexTarget(ColumnIdentifier.getInterned(name, true), type);
 +    }
 +
 +    public static final class CountMetadataReloadsIndex extends StubIndex
 +    {
 +        private final AtomicInteger reloads = new AtomicInteger(0);
 +
 +        public CountMetadataReloadsIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata)
 +        {
 +            super(baseCfs, metadata);
 +        }
 +
 +        public void reset()
 +        {
 +            super.reset();
 +            reloads.set(0);
 +        }
 +
 +        public Callable<?> getMetadataReloadTask(IndexMetadata indexMetadata)
 +        {
 +            return reloads::incrementAndGet;
 +        }
 +    }
 +
 +    public static final class IndexIncludedInBuild extends StubIndex
 +    {
 +        public IndexIncludedInBuild(ColumnFamilyStore baseCfs, IndexMetadata metadata)
 +        {
 +            super(baseCfs, metadata);
 +        }
 +
 +        public boolean shouldBuildBlocking()
 +        {
 +            return true;
 +        }
 +    }
 +
 +    public static final class UTF8ExpressionIndex extends StubIndex
 +    {
 +        public UTF8ExpressionIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata)
 +        {
 +            super(baseCfs, metadata);
 +        }
 +
 +        public AbstractType<?> customExpressionValueType()
 +        {
 +            return UTF8Type.instance;
 +        }
 +    }
 +
 +    public static final class Int32ExpressionIndex extends StubIndex
 +    {
 +        public Int32ExpressionIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata)
 +        {
 +            super(baseCfs, metadata);
 +        }
 +
 +        public AbstractType<?> customExpressionValueType()
 +        {
 +            return Int32Type.instance;
 +        }
 +    }
 +
 +    public static final class SettableSelectivityIndex extends StubIndex
 +    {
 +        private int searchersProvided = 0;
 +        private long estimatedResultRows = 0;
 +
 +        public SettableSelectivityIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata)
 +        {
 +            super(baseCfs, metadata);
 +        }
 +
 +        public void setEstimatedResultRows(long estimate)
 +        {
 +            estimatedResultRows = estimate;
 +        }
 +
 +        public long getEstimatedResultRows()
 +        {
 +            return estimatedResultRows;
 +        }
 +
 +        public Searcher searcherFor(ReadCommand command)
 +        {
 +                searchersProvided++;
 +                return super.searcherFor(command);
 +        }
 +    }
 +
 +    public static final class IndexExcludedFromBuild extends StubIndex
 +    {
 +        public IndexExcludedFromBuild(ColumnFamilyStore baseCfs, IndexMetadata metadata)
 +        {
 +            super(baseCfs, metadata);
 +        }
 +
 +        public boolean shouldBuildBlocking()
 +        {
 +            return false;
 +        }
 +    }
 +
 +    public static final class NoCustomExpressionsIndex extends StubIndex
 +    {
 +        public NoCustomExpressionsIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata)
 +        {
 +            super(baseCfs, metadata);
 +        }
 +
 +        public AbstractType<?> customExpressionValueType()
 +        {
 +            return null;
 +        }
 +    }
 +
 +    public static final class AlwaysRejectIndex extends StubIndex
 +    {
 +        public AlwaysRejectIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata)
 +        {
 +            super(baseCfs, metadata);
 +        }
 +
 +        public void validate(ReadCommand command) throws InvalidRequestException
 +        {
 +            throw new InvalidRequestException("None shall pass");
 +        }
 +
 +        public Searcher searcherFor(ReadCommand command)
 +        {
 +            throw new InvalidRequestException("None shall pass (though I'd have expected to fail faster)");
 +        }
 +    }
 +
 +    public static final class IndexWithValidateOptions extends StubIndex
 +    {
 +        public static Map<String, String> options;
 +
 +        public IndexWithValidateOptions(ColumnFamilyStore baseCfs, IndexMetadata metadata)
 +        {
 +            super(baseCfs, metadata);
 +        }
 +
 +        public static Map<String, String> validateOptions(Map<String, String> options)
 +        {
 +            IndexWithValidateOptions.options = options;
 +            return new HashMap<>();
 +        }
 +    }
 +
 +    public static final class IndexWithOverloadedValidateOptions extends StubIndex
 +    {
 +        public static CFMetaData cfm;
 +        public static Map<String, String> options;
 +
 +        public IndexWithOverloadedValidateOptions(ColumnFamilyStore baseCfs, IndexMetadata metadata)
 +        {
 +            super(baseCfs, metadata);
 +        }
 +
 +        public static Map<String, String> validateOptions(Map<String, String> options, CFMetaData cfm)
 +        {
 +            IndexWithOverloadedValidateOptions.options = options;
 +            IndexWithOverloadedValidateOptions.cfm = cfm;
 +            return new HashMap<>();
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
index a30cf4e,0000000..6aaefb7
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@@ -1,640 -1,0 +1,660 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.index.internal;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.Future;
 +import java.util.function.BiFunction;
 +import java.util.stream.Collectors;
 +import java.util.stream.StreamSupport;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.index.IndexRegistry;
 +import org.apache.cassandra.index.SecondaryIndexBuilder;
 +import org.apache.cassandra.index.transactions.IndexTransaction;
 +import org.apache.cassandra.index.transactions.UpdateTransaction;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.Refs;
 +
 +import static org.apache.cassandra.index.internal.CassandraIndex.getFunctions;
 +import static org.apache.cassandra.index.internal.CassandraIndex.indexCfsMetadata;
 +import static org.apache.cassandra.index.internal.CassandraIndex.parseTarget;
 +
 +/**
 + * Clone of KeysIndex used in CassandraIndexTest#testCustomIndexWithCFS to verify
 + * behaviour of flushing CFS backed CUSTOM indexes
 + */
 +public class CustomCassandraIndex implements Index
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class);
 +
 +    public final ColumnFamilyStore baseCfs;
 +    protected IndexMetadata metadata;
 +    protected ColumnFamilyStore indexCfs;
 +    protected ColumnDefinition indexedColumn;
 +    protected CassandraIndexFunctions functions;
 +
 +    public CustomCassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
 +    {
 +        this.baseCfs = baseCfs;
 +        setMetadata(indexDef);
 +    }
 +
 +    /**
 +     * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value]
 +     * @param indexedColumn
 +     * @param operator
 +     * @return
 +     */
 +    protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
 +    {
 +        return operator.equals(Operator.EQ);
 +    }
 +
 +    public ColumnDefinition getIndexedColumn()
 +    {
 +        return indexedColumn;
 +    }
 +
 +    public ClusteringComparator getIndexComparator()
 +    {
 +        return indexCfs.metadata.comparator;
 +    }
 +
 +    public ColumnFamilyStore getIndexCfs()
 +    {
 +        return indexCfs;
 +    }
 +
 +    public void register(IndexRegistry registry)
 +    {
 +        registry.registerIndex(this);
 +    }
 +
 +    public Callable<?> getInitializationTask()
 +    {
 +        // if we're just linking in the index on an already-built index post-restart
 +        // or if the table is empty we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder
 +        return isBuilt() || baseCfs.isEmpty() ? null : getBuildIndexTask();
 +    }
 +
 +    public IndexMetadata getIndexMetadata()
 +    {
 +        return metadata;
 +    }
 +
 +    public Optional<ColumnFamilyStore> getBackingTable()
 +    {
 +        return indexCfs == null ? Optional.empty() : Optional.of(indexCfs);
 +    }
 +
 +    public Callable<Void> getBlockingFlushTask()
 +    {
 +        return () -> {
 +            indexCfs.forceBlockingFlush();
 +            return null;
 +        };
 +    }
 +
 +    public Callable<?> getInvalidateTask()
 +    {
 +        return () -> {
 +            invalidate();
 +            return null;
 +        };
 +    }
 +
 +    public Callable<?> getMetadataReloadTask(IndexMetadata indexDef)
 +    {
 +        setMetadata(indexDef);
 +        return () -> {
 +            indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
 +            indexCfs.reload();
 +            return null;
 +        };
 +    }
 +
 +    private void setMetadata(IndexMetadata indexDef)
 +    {
 +        metadata = indexDef;
 +        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef);
 +        functions = getFunctions(indexDef, target);
 +        CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
 +        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
 +                                                             cfm.cfName,
 +                                                             cfm,
 +                                                             baseCfs.getTracker().loadsstables);
 +        indexedColumn = target.left;
 +    }
 +
 +    public Callable<?> getTruncateTask(final long truncatedAt)
 +    {
 +        return () -> {
 +            indexCfs.discardSSTables(truncatedAt);
 +            return null;
 +        };
 +    }
 +
 +    public boolean shouldBuildBlocking()
 +    {
 +        return true;
 +    }
 +
 +    public boolean dependsOn(ColumnDefinition column)
 +    {
 +        return column.equals(indexedColumn);
 +    }
 +
 +    public boolean supportsExpression(ColumnDefinition column, Operator operator)
 +    {
 +        return indexedColumn.name.equals(column.name)
 +               && supportsOperator(indexedColumn, operator);
 +    }
 +
 +    public AbstractType<?> customExpressionValueType()
 +    {
 +        return null;
 +    }
 +
 +    private boolean supportsExpression(RowFilter.Expression expression)
 +    {
 +        return supportsExpression(expression.column(), expression.operator());
 +    }
 +
 +    public long getEstimatedResultRows()
 +    {
 +        return indexCfs.getMeanColumns();
 +    }
 +
 +    /**
 +     * No post processing of query results, just return them unchanged
 +     */
 +    public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command)
 +    {
 +        return (partitionIterator, readCommand) -> partitionIterator;
 +    }
 +
 +    public RowFilter getPostIndexQueryFilter(RowFilter filter)
 +    {
 +        return getTargetExpression(filter.getExpressions()).map(filter::without)
 +                                                           .orElse(filter);
 +    }
 +
 +    private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression> expressions)
 +    {
 +        return expressions.stream().filter(this::supportsExpression).findFirst();
 +    }
 +
 +    public Index.Searcher searcherFor(ReadCommand command)
 +    {
 +        return null;
 +    }
 +
 +    public void validate(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        switch (indexedColumn.kind)
 +        {
 +            case PARTITION_KEY:
 +                validatePartitionKey(update.partitionKey());
 +                break;
 +            case CLUSTERING:
 +                validateClusterings(update);
 +                break;
 +            case REGULAR:
 +                validateRows(update);
 +                break;
 +            case STATIC:
 +                validateRows(Collections.singleton(update.staticRow()));
 +                break;
 +        }
 +    }
 +
 +    protected CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
 +                                               ClusteringPrefix prefix,
 +                                               CellPath path)
 +    {
 +        CBuilder builder = CBuilder.create(getIndexComparator());
 +        builder.add(partitionKey);
 +        return builder;
 +    }
 +
 +    protected ByteBuffer getIndexedValue(ByteBuffer partitionKey,
 +                                      Clustering clustering,
 +                                      CellPath path, ByteBuffer cellValue)
 +    {
 +        return cellValue;
 +    }
 +
 +    public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
 +    {
 +        throw new UnsupportedOperationException("KEYS indexes do not use a specialized index entry format");
 +    }
 +
 +    public boolean isStale(Row row, ByteBuffer indexValue, int nowInSec)
 +    {
 +        if (row == null)
 +            return true;
 +
 +        Cell cell = row.getCell(indexedColumn);
 +
 +        return (cell == null
 +             || !cell.isLive(nowInSec)
 +             || indexedColumn.type.compare(indexValue, cell.value()) != 0);
 +    }
 +
 +    public Indexer indexerFor(final DecoratedKey key,
 +                              final PartitionColumns columns,
 +                              final int nowInSec,
 +                              final OpOrder.Group opGroup,
 +                              final IndexTransaction.Type transactionType)
 +    {
 +        if (!isPrimaryKeyIndex() && !columns.contains(indexedColumn))
 +            return null;
 +
 +        return new Indexer()
 +        {
 +            public void begin()
 +            {
 +            }
 +
 +            public void partitionDelete(DeletionTime deletionTime)
 +            {
 +            }
 +
 +            public void rangeTombstone(RangeTombstone tombstone)
 +            {
 +            }
 +
 +            public void insertRow(Row row)
 +            {
 +                if (isPrimaryKeyIndex())
 +                {
 +                    indexPrimaryKey(row.clustering(),
 +                                    getPrimaryKeyIndexLiveness(row),
 +                                    row.deletion());
 +                }
 +                else
 +                {
 +                    if (indexedColumn.isComplex())
 +                        indexCells(row.clustering(), row.getComplexColumnData(indexedColumn));
 +                    else
 +                        indexCell(row.clustering(), row.getCell(indexedColumn));
 +                }
 +            }
 +
 +            public void removeRow(Row row)
 +            {
 +                if (isPrimaryKeyIndex())
 +                    indexPrimaryKey(row.clustering(), row.primaryKeyLivenessInfo(), row.deletion());
 +
 +                if (indexedColumn.isComplex())
 +                    removeCells(row.clustering(), row.getComplexColumnData(indexedColumn));
 +                else
 +                    removeCell(row.clustering(), row.getCell(indexedColumn));
 +            }
 +
 +
 +            public void updateRow(Row oldRow, Row newRow)
 +            {
 +                if (isPrimaryKeyIndex())
 +                    indexPrimaryKey(newRow.clustering(),
 +                                    newRow.primaryKeyLivenessInfo(),
 +                                    newRow.deletion());
 +
 +                if (indexedColumn.isComplex())
 +                {
 +                    indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn));
 +                    removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn));
 +                }
 +                else
 +                {
 +                    indexCell(newRow.clustering(), newRow.getCell(indexedColumn));
 +                    removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn));
 +                }
 +            }
 +
 +            public void finish()
 +            {
 +            }
 +
 +            private void indexCells(Clustering clustering, Iterable<Cell> cells)
 +            {
 +                if (cells == null)
 +                    return;
 +
 +                for (Cell cell : cells)
 +                    indexCell(clustering, cell);
 +            }
 +
 +            private void indexCell(Clustering clustering, Cell cell)
 +            {
 +                if (cell == null || !cell.isLive(nowInSec))
 +                    return;
 +
 +                insert(key.getKey(),
 +                       clustering,
 +                       cell,
 +                       LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()),
 +                       opGroup);
 +            }
 +
 +            private void removeCells(Clustering clustering, Iterable<Cell> cells)
 +            {
 +                if (cells == null)
 +                    return;
 +
 +                for (Cell cell : cells)
 +                    removeCell(clustering, cell);
 +            }
 +
 +            private void removeCell(Clustering clustering, Cell cell)
 +            {
 +                if (cell == null || !cell.isLive(nowInSec))
 +                    return;
 +
 +                delete(key.getKey(), clustering, cell, opGroup, nowInSec);
 +            }
 +
 +            private void indexPrimaryKey(final Clustering clustering,
 +                                         final LivenessInfo liveness,
 +                                         final Row.Deletion deletion)
 +            {
 +                if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
 +                    insert(key.getKey(), clustering, null, liveness, opGroup);
 +
 +                if (!deletion.isLive())
 +                    delete(key.getKey(), clustering, deletion.time(), opGroup);
 +            }
 +
 +            private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
 +            {
 +                long timestamp = row.primaryKeyLivenessInfo().timestamp();
 +                int ttl = row.primaryKeyLivenessInfo().ttl();
 +                for (Cell cell : row.cells())
 +                {
 +                    long cellTimestamp = cell.timestamp();
 +                    if (cell.isLive(nowInSec))
 +                    {
 +                        if (cellTimestamp > timestamp)
 +                        {
 +                            timestamp = cellTimestamp;
 +                            ttl = cell.ttl();
 +                        }
 +                    }
 +                }
 +                return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec);
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Specific to internal indexes, this is called by a
 +     * searcher when it encounters a stale entry in the index
 +     * @param indexKey the partition key in the index table
 +     * @param indexClustering the clustering in the index table
 +     * @param deletion deletion timestamp etc
 +     * @param opGroup the operation under which to perform the deletion
 +     */
 +    public void deleteStaleEntry(DecoratedKey indexKey,
 +                                 Clustering indexClustering,
 +                                 DeletionTime deletion,
 +                                 OpOrder.Group opGroup)
 +    {
 +        doDelete(indexKey, indexClustering, deletion, opGroup);
 +        logger.debug("Removed index entry for stale value {}", indexKey);
 +    }
 +
 +    /**
 +     * Called when adding a new entry to the index
 +     */
 +    private void insert(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        Cell cell,
 +                        LivenessInfo info,
 +                        OpOrder.Group opGroup)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               cell));
 +        Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info);
 +        PartitionUpdate upd = partitionUpdate(valueKey, row);
 +        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
 +        logger.debug("Inserted entry into index for value {}", valueKey);
 +    }
 +
 +    /**
 +     * Called when deleting entries on non-primary key columns
 +     */
 +    private void delete(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        Cell cell,
 +                        OpOrder.Group opGroup,
 +                        int nowInSec)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               cell));
 +        doDelete(valueKey,
 +                 buildIndexClustering(rowKey, clustering, cell),
 +                 new DeletionTime(cell.timestamp(), nowInSec),
 +                 opGroup);
 +    }
 +
 +    /**
 +     * Called when deleting entries from indexes on primary key columns
 +     */
 +    private void delete(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        DeletionTime deletion,
 +                        OpOrder.Group opGroup)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               null));
 +        doDelete(valueKey,
 +                 buildIndexClustering(rowKey, clustering, null),
 +                 deletion,
 +                 opGroup);
 +    }
 +
 +    private void doDelete(DecoratedKey indexKey,
 +                          Clustering indexClustering,
 +                          DeletionTime deletion,
 +                          OpOrder.Group opGroup)
 +    {
 +        Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion));
 +        PartitionUpdate upd = partitionUpdate(indexKey, row);
 +        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
 +        logger.debug("Removed index entry for value {}", indexKey);
 +    }
 +
 +    private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException
 +    {
 +        assert indexedColumn.isPartitionKey();
 +        validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null ));
 +    }
 +
 +    private void validateClusterings(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        assert indexedColumn.isClusteringColumn();
 +        for (Row row : update)
 +            validateIndexedValue(getIndexedValue(null, row.clustering(), null));
 +    }
 +
 +    private void validateRows(Iterable<Row> rows)
 +    {
 +        assert !indexedColumn.isPrimaryKeyColumn();
 +        for (Row row : rows)
 +        {
 +            if (indexedColumn.isComplex())
 +            {
 +                ComplexColumnData data = row.getComplexColumnData(indexedColumn);
 +                if (data != null)
 +                {
 +                    for (Cell cell : data)
 +                    {
 +                        validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value()));
 +                    }
 +                }
 +            }
 +            else
 +            {
 +                validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn)));
 +            }
 +        }
 +    }
 +
 +    private void validateIndexedValue(ByteBuffer value)
 +    {
 +        if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
 +            throw new InvalidRequestException(String.format(
 +                                                           "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
 +                                                           value.remaining(),
 +                                                           metadata.name,
 +                                                           baseCfs.metadata.ksName,
 +                                                           baseCfs.metadata.cfName,
 +                                                           indexedColumn.name.toString(),
 +                                                           FBUtilities.MAX_UNSIGNED_SHORT));
 +    }
 +
 +    private ByteBuffer getIndexedValue(ByteBuffer rowKey,
 +                                       Clustering clustering,
 +                                       Cell cell)
 +    {
 +        return getIndexedValue(rowKey,
 +                               clustering,
 +                               cell == null ? null : cell.path(),
 +                               cell == null ? null : cell.value()
 +        );
 +    }
 +
 +    private Clustering buildIndexClustering(ByteBuffer rowKey,
 +                                            Clustering clustering,
 +                                            Cell cell)
 +    {
 +        return buildIndexClusteringPrefix(rowKey,
 +                                          clustering,
 +                                          cell == null ? null : cell.path()).build();
 +    }
 +
 +    private DecoratedKey getIndexKeyFor(ByteBuffer value)
 +    {
 +        return indexCfs.decorateKey(value);
 +    }
 +
 +    private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row)
 +    {
 +        return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
 +    }
 +
 +    private void invalidate()
 +    {
 +        // interrupt in-progress compactions
 +        Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
 +        CompactionManager.instance.interruptCompactionForCFs(cfss, true);
 +        CompactionManager.instance.waitForCessation(cfss);
 +        indexCfs.keyspace.writeOrder.awaitNewBarrier();
 +        indexCfs.forceBlockingFlush();
 +        indexCfs.readOrdering.awaitNewBarrier();
 +        indexCfs.invalidate();
 +    }
 +
 +    private boolean isBuilt()
 +    {
 +        return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), metadata.name);
 +    }
 +
 +    private boolean isPrimaryKeyIndex()
 +    {
 +        return indexedColumn.isPrimaryKeyColumn();
 +    }
 +
 +    private Callable<?> getBuildIndexTask()
 +    {
 +        return () -> {
 +            buildBlocking();
 +            return null;
 +        };
 +    }
 +
 +    private void buildBlocking()
 +    {
 +        baseCfs.forceBlockingFlush();
 +
 +        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
 +             Refs<SSTableReader> sstables = viewFragment.refs)
 +        {
 +            if (sstables.isEmpty())
 +            {
 +                logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built",
 +                            baseCfs.metadata.ksName,
 +                            baseCfs.metadata.cfName,
 +                            metadata.name);
 +                baseCfs.indexManager.markIndexBuilt(metadata.name);
 +                return;
 +            }
 +
 +            logger.info("Submitting index build of {} for data in {}",
 +                        metadata.name,
 +                        getSSTableNames(sstables));
 +
 +            SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
 +                                                                      Collections.singleton(this),
 +                                                                      new ReducingKeyIterator(sstables));
 +            Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
 +            FBUtilities.waitOnFuture(future);
 +            indexCfs.forceBlockingFlush();
 +            baseCfs.indexManager.markIndexBuilt(metadata.name);
 +        }
 +        logger.info("Index build of {} complete", metadata.name);
 +    }
 +
 +    private static String getSSTableNames(Collection<SSTableReader> sstables)
 +    {
 +        return StreamSupport.stream(sstables.spliterator(), false)
 +                            .map(SSTableReader::toString)
 +                            .collect(Collectors.joining(", "));
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
index e83c015,0000000..aad5117
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
@@@ -1,483 -1,0 +1,503 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.io.util;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.nio.ByteOrder;
 +import java.nio.MappedByteBuffer;
 +import java.nio.channels.FileChannel;
 +import java.nio.channels.FileLock;
 +import java.nio.channels.ReadableByteChannel;
 +import java.nio.channels.WritableByteChannel;
 +import java.nio.charset.Charset;
 +import java.util.Arrays;
 +import java.util.Random;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Test;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import static org.junit.Assert.*;
 +
 +import org.apache.cassandra.io.compress.BufferType;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +public class RandomAccessReaderTest
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(RandomAccessReaderTest.class);
 +
 +    private static final class Parameters
 +    {
 +        public final long fileLength;
 +        public final int bufferSize;
 +
 +        public BufferType bufferType;
 +        public int maxSegmentSize;
 +        public boolean mmappedRegions;
 +        public byte[] expected;
 +
 +        Parameters(long fileLength, int bufferSize)
 +        {
 +            this.fileLength = fileLength;
 +            this.bufferSize = bufferSize;
 +            this.bufferType = BufferType.OFF_HEAP;
 +            this.maxSegmentSize = MmappedRegions.MAX_SEGMENT_SIZE;
 +            this.mmappedRegions = false;
 +            this.expected = "The quick brown fox jumps over the lazy dog".getBytes(FileUtils.CHARSET);
 +        }
 +
 +        public Parameters mmappedRegions(boolean mmappedRegions)
 +        {
 +            this.mmappedRegions = mmappedRegions;
 +            return this;
 +        }
 +
 +        public Parameters bufferType(BufferType bufferType)
 +        {
 +            this.bufferType = bufferType;
 +            return this;
 +        }
 +
 +        public Parameters maxSegmentSize(int maxSegmentSize)
 +        {
 +            this.maxSegmentSize = maxSegmentSize;
 +            return this;
 +        }
 +
 +        public Parameters expected(byte[] expected)
 +        {
 +            this.expected = expected;
 +            return this;
 +        }
 +    }
 +
 +    @Test
 +    public void testBufferedOffHeap() throws IOException
 +    {
 +        testReadFully(new Parameters(8192, 4096).bufferType(BufferType.OFF_HEAP));
 +    }
 +
 +    @Test
 +    public void testBufferedOnHeap() throws IOException
 +    {
 +        testReadFully(new Parameters(8192, 4096).bufferType(BufferType.ON_HEAP));
 +    }
 +
 +    @Test
 +    public void testBigBufferSize() throws IOException
 +    {
 +        testReadFully(new Parameters(8192, 65536).bufferType(BufferType.ON_HEAP));
 +    }
 +
 +    @Test
 +    public void testTinyBufferSize() throws IOException
 +    {
 +        testReadFully(new Parameters(8192, 16).bufferType(BufferType.ON_HEAP));
 +    }
 +
 +    @Test
 +    public void testOneSegment() throws IOException
 +    {
 +        testReadFully(new Parameters(8192, 4096).mmappedRegions(true));
 +    }
 +
 +    @Test
 +    public void testMultipleSegments() throws IOException
 +    {
 +        testReadFully(new Parameters(8192, 4096).mmappedRegions(true).maxSegmentSize(1024));
 +    }
 +
 +    @Test
 +    public void testVeryLarge() throws IOException
 +    {
 +        final long SIZE = 1L << 32; // 2GB
 +        Parameters params = new Parameters(SIZE, 1 << 20); // 1MB
 +
 +        try(ChannelProxy channel = new ChannelProxy("abc", new FakeFileChannel(SIZE)))
 +        {
 +            RandomAccessReader.Builder builder = new RandomAccessReader.Builder(channel)
 +                                                 .bufferType(params.bufferType)
 +                                                 .bufferSize(params.bufferSize);
 +
 +            try(RandomAccessReader reader = builder.build())
 +            {
 +                assertEquals(channel.size(), reader.length());
 +                assertEquals(channel.size(), reader.bytesRemaining());
 +                assertEquals(Integer.MAX_VALUE, reader.available());
 +
 +                assertEquals(channel.size(), reader.skip(channel.size()));
 +
 +                assertTrue(reader.isEOF());
 +                assertEquals(0, reader.bytesRemaining());
 +            }
 +        }
 +    }
 +
 +    /** A fake file channel that simply increments the position and doesn't
 +     * actually read anything. We use it to simulate very large files, > 2G.
 +     */
 +    private static final class FakeFileChannel extends FileChannel
 +    {
 +        private final long size;
 +        private long position;
 +
 +        FakeFileChannel(long size)
 +        {
 +            this.size = size;
 +        }
 +
 +        public int read(ByteBuffer dst)
 +        {
 +            int ret = dst.remaining();
 +            position += ret;
 +            dst.position(dst.limit());
 +            return ret;
 +        }
 +
 +        public long read(ByteBuffer[] dsts, int offset, int length)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        public int write(ByteBuffer src)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        public long write(ByteBuffer[] srcs, int offset, int length)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        public long position()
 +        {
 +            return position;
 +        }
 +
 +        public FileChannel position(long newPosition)
 +        {
 +            position = newPosition;
 +            return this;
 +        }
 +
 +        public long size()
 +        {
 +            return size;
 +        }
 +
 +        public FileChannel truncate(long size)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        public void force(boolean metaData)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        public long transferTo(long position, long count, WritableByteChannel target)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        public long transferFrom(ReadableByteChannel src, long position, long count)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        public int read(ByteBuffer dst, long position)
 +        {
 +            int ret = dst.remaining();
 +            this.position = position + ret;
 +            dst.position(dst.limit());
 +            return ret;
 +        }
 +
 +        public int write(ByteBuffer src, long position)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        public MappedByteBuffer map(MapMode mode, long position, long size)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        public FileLock lock(long position, long size, boolean shared)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        public FileLock tryLock(long position, long size, boolean shared)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        protected void implCloseChannel()
 +        {
 +
 +        }
 +    }
 +
 +    private static File writeFile(Parameters params) throws IOException
 +    {
 +        final File f = File.createTempFile("testReadFully", "1");
 +        f.deleteOnExit();
 +
 +        try(SequentialWriter writer = SequentialWriter.open(f))
 +        {
 +            long numWritten = 0;
 +            while (numWritten < params.fileLength)
 +            {
 +                writer.write(params.expected);
 +                numWritten += params.expected.length;
 +            }
 +
 +            writer.finish();
 +        }
 +
 +        assert f.exists();
 +        assert f.length() >= params.fileLength;
 +        return f;
 +    }
 +
 +    private static void testReadFully(Parameters params) throws IOException
 +    {
 +        final File f = writeFile(params);
 +        try(ChannelProxy channel = new ChannelProxy(f))
 +        {
 +            RandomAccessReader.Builder builder = new RandomAccessReader.Builder(channel)
 +                                                 .bufferType(params.bufferType)
 +                                                 .bufferSize(params.bufferSize);
 +            if (params.mmappedRegions)
 +                builder.regions(MmappedRegions.map(channel, f.length()));
 +
 +            try(RandomAccessReader reader = builder.build())
 +            {
 +                assertEquals(f.getAbsolutePath(), reader.getPath());
 +                assertEquals(f.length(), reader.length());
 +                assertEquals(f.length(), reader.bytesRemaining());
 +                assertEquals(Math.min(Integer.MAX_VALUE, f.length()), reader.available());
 +
 +                byte[] b = new byte[params.expected.length];
 +                long numRead = 0;
 +                while (numRead < params.fileLength)
 +                {
 +                    reader.readFully(b);
 +                    assertTrue(Arrays.equals(params.expected, b));
 +                    numRead += b.length;
 +                }
 +
 +                assertTrue(reader.isEOF());
 +                assertEquals(0, reader.bytesRemaining());
 +            }
 +
 +            if (builder.regions != null)
 +                assertNull(builder.regions.close(null));
 +        }
 +    }
 +
 +    @Test
 +    public void testReadBytes() throws IOException
 +    {
 +        File f = File.createTempFile("testReadBytes", "1");
 +        final String expected = "The quick brown fox jumps over the lazy dog";
 +
 +        try(SequentialWriter writer = SequentialWriter.open(f))
 +        {
 +            writer.write(expected.getBytes());
 +            writer.finish();
 +        }
 +
 +        assert f.exists();
 +
 +        try(ChannelProxy channel = new ChannelProxy(f);
 +            RandomAccessReader reader = new RandomAccessReader.Builder(channel).build())
 +        {
 +            assertEquals(f.getAbsolutePath(), reader.getPath());
 +            assertEquals(expected.length(), reader.length());
 +
 +            ByteBuffer b = ByteBufferUtil.read(reader, expected.length());
 +            assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
 +
 +            assertTrue(reader.isEOF());
 +            assertEquals(0, reader.bytesRemaining());
 +        }
 +    }
 +
 +    @Test
 +    public void testReset() throws IOException
 +    {
 +        File f = File.createTempFile("testMark", "1");
 +        final String expected = "The quick brown fox jumps over the lazy dog";
 +        final int numIterations = 10;
 +
 +        try(SequentialWriter writer = SequentialWriter.open(f))
 +        {
 +            for (int i = 0; i < numIterations; i++)
 +                writer.write(expected.getBytes());
 +            writer.finish();
 +        }
 +
 +        assert f.exists();
 +
 +        try(ChannelProxy channel = new ChannelProxy(f);
 +        RandomAccessReader reader = new RandomAccessReader.Builder(channel).build())
 +        {
 +            assertEquals(expected.length() * numIterations, reader.length());
 +
 +            ByteBuffer b = ByteBufferUtil.read(reader, expected.length());
 +            assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
 +
 +            assertFalse(reader.isEOF());
 +            assertEquals((numIterations - 1) * expected.length(), reader.bytesRemaining());
 +
 +            DataPosition mark = reader.mark();
 +            assertEquals(0, reader.bytesPastMark());
 +            assertEquals(0, reader.bytesPastMark(mark));
 +
 +            for (int i = 0; i < (numIterations - 1); i++)
 +            {
 +                b = ByteBufferUtil.read(reader, expected.length());
 +                assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
 +            }
 +            assertTrue(reader.isEOF());
 +            assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark());
 +            assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark(mark));
 +
 +            reader.reset(mark);
 +            assertEquals(0, reader.bytesPastMark());
 +            assertEquals(0, reader.bytesPastMark(mark));
 +            assertFalse(reader.isEOF());
 +            for (int i = 0; i < (numIterations - 1); i++)
 +            {
 +                b = ByteBufferUtil.read(reader, expected.length());
 +                assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
 +            }
 +
 +            reader.reset();
 +            assertEquals(0, reader.bytesPastMark());
 +            assertEquals(0, reader.bytesPastMark(mark));
 +            assertFalse(reader.isEOF());
 +            for (int i = 0; i < (numIterations - 1); i++)
 +            {
 +                b = ByteBufferUtil.read(reader, expected.length());
 +                assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
 +            }
 +
 +            assertTrue(reader.isEOF());
 +        }
 +    }
 +
 +    @Test
 +    public void testSeekSingleThread() throws IOException, InterruptedException
 +    {
 +        testSeek(1);
 +    }
 +
 +    @Test
 +    public void testSeekMultipleThreads() throws IOException, InterruptedException
 +    {
 +        testSeek(10);
 +    }
 +
 +    private static void testSeek(int numThreads) throws IOException, InterruptedException
 +    {
 +        final File f = File.createTempFile("testMark", "1");
 +        final byte[] expected = new byte[1 << 16];
 +
 +        long seed = System.nanoTime();
 +        //seed = 365238103404423L;
 +        logger.info("Seed {}", seed);
 +        Random r = new Random(seed);
 +        r.nextBytes(expected);
 +
 +        try(SequentialWriter writer = SequentialWriter.open(f))
 +        {
 +            writer.write(expected);
 +            writer.finish();
 +        }
 +
 +        assert f.exists();
 +
 +        try(final ChannelProxy channel = new ChannelProxy(f))
 +        {
 +            final Runnable worker = () ->
 +            {
 +                try(RandomAccessReader reader = new RandomAccessReader.Builder(channel).build())
 +                {
 +                    assertEquals(expected.length, reader.length());
 +
 +                    ByteBuffer b = ByteBufferUtil.read(reader, expected.length);
 +                    assertTrue(Arrays.equals(expected, b.array()));
 +                    assertTrue(reader.isEOF());
 +                    assertEquals(0, reader.bytesRemaining());
 +
 +                    reader.seek(0);
 +                    b = ByteBufferUtil.read(reader, expected.length);
 +                    assertTrue(Arrays.equals(expected, b.array()));
 +                    assertTrue(reader.isEOF());
 +                    assertEquals(0, reader.bytesRemaining());
 +
 +                    for (int i = 0; i < 10; i++)
 +                    {
 +                        int pos = r.nextInt(expected.length);
 +                        reader.seek(pos);
 +                        assertEquals(pos, reader.getPosition());
 +
 +                        ByteBuffer buf = ByteBuffer.wrap(expected, pos, expected.length - pos)
 +                                                   .order(ByteOrder.BIG_ENDIAN);
 +
 +                        while (reader.bytesRemaining() > 4)
 +                            assertEquals(buf.getInt(), reader.readInt());
 +                    }
 +
 +                    reader.close();
 +                }
 +                catch (Exception ex)
 +                {
 +                    ex.printStackTrace();
 +                    fail(ex.getMessage());
 +                }
 +            };
 +
 +            if (numThreads == 1)
 +            {
 +                worker.run();
 +            }
 +            else
 +            {
 +                ExecutorService executor = Executors.newFixedThreadPool(numThreads);
 +                for (int i = 0; i < numThreads; i++)
 +                    executor.submit(worker);
 +
 +                executor.shutdown();
 +                executor.awaitTermination(1, TimeUnit.MINUTES);
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/service/RMIServerSocketFactoryImplTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/RMIServerSocketFactoryImplTest.java
index 3459ec3,0000000..393dfe1
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/service/RMIServerSocketFactoryImplTest.java
+++ b/test/unit/org/apache/cassandra/service/RMIServerSocketFactoryImplTest.java
@@@ -1,24 -1,0 +1,44 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.service;
 +
 +import java.io.IOException;
 +import java.net.ServerSocket;
 +import java.rmi.server.RMIServerSocketFactory;
 +
 +import org.junit.Test;
 +
 +import org.apache.cassandra.utils.RMIServerSocketFactoryImpl;
 +
 +import static org.junit.Assert.assertTrue;
 +
 +
 +public class RMIServerSocketFactoryImplTest
 +{
 +    @Test
 +    public void testReusableAddrSocket() throws IOException
 +    {
 +        RMIServerSocketFactory serverFactory = new RMIServerSocketFactoryImpl();
 +        ServerSocket socket = serverFactory.createServerSocket(7199);
 +        assertTrue(socket.getReuseAddress());
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
----------------------------------------------------------------------


Mime
View raw message