Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EF26C200C01 for ; Thu, 19 Jan 2017 23:15:03 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id EDE9B160B3A; Thu, 19 Jan 2017 22:15:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C6946160B59 for ; Thu, 19 Jan 2017 23:15:02 +0100 (CET) Received: (qmail 57787 invoked by uid 500); 19 Jan 2017 22:15:01 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 57464 invoked by uid 99); 19 Jan 2017 22:15:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jan 2017 22:15:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 60928F4035; Thu, 19 Jan 2017 22:15:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jasobrown@apache.org To: commits@cassandra.apache.org Date: Thu, 19 Jan 2017 22:15:04 -0000 Message-Id: <3042e4cd87354265b10fcdb4157a7bd4@git.apache.org> In-Reply-To: <2a3c18070d16474bb77a78764290bb3b@git.apache.org> References: <2a3c18070d16474bb77a78764290bb3b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11 archived-at: Thu, 19 Jan 2017 22:15:04 -0000 Merge branch 'cassandra-3.0' into cassandra-3.11 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e3d26b6a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e3d26b6a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e3d26b6a Branch: refs/heads/trunk Commit: e3d26b6aa18a48a52ec4f5511bdc484a45ec3ce4 Parents: 74559de 7a06df7 Author: Jason Brown Authored: Thu Jan 19 14:08:37 2017 -0800 Committer: Jason Brown Committed: Thu Jan 19 14:13:21 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/index/SecondaryIndexManager.java | 86 ++++++++++----- .../service/pager/AbstractQueryPager.java | 56 ++++++++-- .../apache/cassandra/index/CustomIndexTest.java | 106 +++++++++++++++++++ .../org/apache/cassandra/index/StubIndex.java | 4 + 5 files changed, 216 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3d26b6a/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 8b613e6,a85386b..224ef5d --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,121 -1,12 +1,122 @@@ -3.0.11 +3.10 + * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058) + * Fixed query monitoring for range queries (CASSANDRA-13050) + * Remove outboundBindAny configuration property (CASSANDRA-12673) + * Use correct bounds for all-data range when filtering (CASSANDRA-12666) + * Remove timing window in test case (CASSANDRA-12875) + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945) + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919) + * Fix validation of non-frozen UDT cells (CASSANDRA-12916) + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903) + * Fix Murmur3PartitionerTest (CASSANDRA-12858) + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897) + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283) + * Fix cassandra-stress truncate option (CASSANDRA-12695) + * Fix crossNode value when receiving messages (CASSANDRA-12791) + * Don't load MX4J beans twice (CASSANDRA-12869) + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838) + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836) + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845) + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454) + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777) + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419) + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803) + * Use different build directories for Eclipse and Ant (CASSANDRA-12466) + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815) + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812) + * Upgrade commons-codec to 1.9 (CASSANDRA-12790) + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550) + * Add duration data type (CASSANDRA-11873) + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784) + * Improve sum aggregate functions (CASSANDRA-12417) + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761) + * cqlsh fails to format collections when using aliases (CASSANDRA-11534) + * Check for hash conflicts in prepared statements (CASSANDRA-12733) + * Exit query parsing upon first error (CASSANDRA-12598) + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729) + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450) + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199) + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461) + * Add hint delivery metrics (CASSANDRA-12693) + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731) + * ColumnIndex does not reuse buffer (CASSANDRA-12502) + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697) + * Upgrade metrics-reporter dependencies (CASSANDRA-12089) + * Tune compaction thread count via nodetool (CASSANDRA-12248) + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232) + * Include repair session IDs in repair start message (CASSANDRA-12532) + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039) + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667) + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318) + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647) + * Fix cassandra-stress graphing (CASSANDRA-12237) + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031) + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585) + * Add JMH benchmarks.jar (CASSANDRA-12586) + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567) + * Add keep-alive to streaming (CASSANDRA-11841) + * Tracing payload is passed through newSession(..) (CASSANDRA-11706) + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261) + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486) + * Retry all internode messages once after a connection is + closed and reopened (CASSANDRA-12192) + * Add support to rebuild from targeted replica (CASSANDRA-9875) + * Add sequence distribution type to cassandra stress (CASSANDRA-12490) + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154) + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474) + * Extend read/write failure messages with a map of replica addresses + to error codes in the v5 native protocol (CASSANDRA-12311) + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374) + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550) + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378) + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223) + * Added slow query log (CASSANDRA-12403) + * Count full coordinated request against timeout (CASSANDRA-12256) + * Allow TTL with null value on insert and update (CASSANDRA-12216) + * Make decommission operation resumable (CASSANDRA-12008) + * Add support to one-way targeted repair (CASSANDRA-9876) + * Remove clientutil jar (CASSANDRA-11635) + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717) + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358) + * Cassandra stress should dump all setting on startup (CASSANDRA-11914) + * Make it possible to compact a given token range (CASSANDRA-10643) + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179) + * Collect metrics on queries by consistency level (CASSANDRA-7384) + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707) + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228) + * Upgrade to OHC 0.4.4 (CASSANDRA-12133) + * Add version command to cassandra-stress (CASSANDRA-12258) + * Create compaction-stress tool (CASSANDRA-11844) + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019) + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142) + * Support filtering on non-PRIMARY KEY columns in the CREATE + MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368) + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004) + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174) + * Faster write path (CASSANDRA-12269) + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424) + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035) + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635) + * Prepend snapshot name with "truncated" or "dropped" when a snapshot + is taken before truncating or dropping a table (CASSANDRA-12178) + * Optimize RestrictionSet (CASSANDRA-12153) + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150) + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613) + * Create a system table to expose prepared statements (CASSANDRA-8831) + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970) + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580) + * Add supplied username to authentication error messages (CASSANDRA-12076) + * Remove pre-startup check for open JMX port (CASSANDRA-12074) + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738) + * Restore resumable hints delivery (CASSANDRA-11960) + * Properly report LWT contention (CASSANDRA-12626) +Merged from 3.0: + * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075) * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115) - * Stress daemon help is incorrect (CASSANDRA-12563) + * Stress daemon help is incorrect(CASSANDRA-12563) * Remove ALTER TYPE support (CASSANDRA-12443) * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203) - * Set javac encoding to utf-8 (CASSANDRA-11077) * Replace empty strings with null values if they cannot be converted (CASSANDRA-12794) - * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348) * Fix deserialization of 2.x DeletedCells (CASSANDRA-12620) * Add parent repair session id to anticompaction log message (CASSANDRA-12186) * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3d26b6a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/SecondaryIndexManager.java index 6adefbd,d39b607..08b4f8b --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@@ -558,40 -537,69 +557,69 @@@ public class SecondaryIndexManager impl int nowInSec = cmd.nowInSec(); boolean readStatic = false; - SinglePartitionPager pager = new SinglePartitionPager(cmd, null, Server.CURRENT_VERSION); + SinglePartitionPager pager = new SinglePartitionPager(cmd, null, ProtocolVersion.CURRENT); while (!pager.isExhausted()) { - try (ReadOrderGroup readGroup = cmd.startOrderGroup(); + try (ReadExecutionController controller = cmd.executionController(); OpOrder.Group writeGroup = Keyspace.writeOrder.start(); - RowIterator partition = - PartitionIterators.getOnlyElement(pager.fetchPageInternal(pageSize, controller), - cmd)) - UnfilteredPartitionIterator page = pager.fetchPageUnfiltered(baseCfs.metadata, pageSize, readGroup)) ++ UnfilteredPartitionIterator page = pager.fetchPageUnfiltered(baseCfs.metadata, pageSize, controller)) { - Set indexers = indexes.stream() - .map(index -> index.indexerFor(key, - partition.columns(), - nowInSec, - writeGroup, - IndexTransaction.Type.UPDATE)) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - - indexers.forEach(Index.Indexer::begin); - - // only process the static row once per partition - if (!readStatic && !partition.staticRow().isEmpty()) - { - indexers.forEach(indexer -> indexer.insertRow(partition.staticRow())); - readStatic = true; + if (!page.hasNext()) + break; + + try (UnfilteredRowIterator partition = page.next()) { + Set indexers = indexes.stream() + .map(index -> index.indexerFor(key, + partition.columns(), + nowInSec, + writeGroup, + IndexTransaction.Type.UPDATE)) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + // Short-circuit empty partitions if static row is processed or isn't read + if (!readStatic && partition.isEmpty() && partition.staticRow().isEmpty()) + break; + + indexers.forEach(Index.Indexer::begin); + + if (!readStatic) + { + if (!partition.staticRow().isEmpty()) + indexers.forEach(indexer -> indexer.insertRow(partition.staticRow())); + indexers.forEach((Index.Indexer i) -> i.partitionDelete(partition.partitionLevelDeletion())); + readStatic = true; + } + + MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(partition.partitionLevelDeletion(), baseCfs.getComparator(), false); + + while (partition.hasNext()) + { + Unfiltered unfilteredRow = partition.next(); + + if (unfilteredRow.isRow()) + { + Row row = (Row) unfilteredRow; + indexers.forEach(indexer -> indexer.insertRow(row)); + } + else + { + assert unfilteredRow.isRangeTombstoneMarker(); + RangeTombstoneMarker marker = (RangeTombstoneMarker) unfilteredRow; + deletionBuilder.add(marker); + } + } + + MutableDeletionInfo deletionInfo = deletionBuilder.build(); + if (deletionInfo.hasRanges()) + { + Iterator iter = deletionInfo.rangeIterator(false); + while (iter.hasNext()) + indexers.forEach(indexer -> indexer.rangeTombstone(iter.next())); + } + + indexers.forEach(Index.Indexer::finish); } - - while (partition.hasNext()) - { - Row row = partition.next(); - indexers.forEach(indexer -> indexer.insertRow(row)); - } - - indexers.forEach(Index.Indexer::finish); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3d26b6a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index 22ddc84,74ec47d..e937a0d --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@@ -62,27 -64,63 +63,64 @@@ abstract class AbstractQueryPager imple return EmptyIterators.partition(); pageSize = Math.min(pageSize, remaining); - Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec()); - + Pager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec()); - return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState), pager); + return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState, queryStartNanoTime), pager); } - public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException + public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) { if (isExhausted()) return EmptyIterators.partition(); pageSize = Math.min(pageSize, remaining); - Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec()); - + RowPager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec()); - return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(orderGroup), pager); + return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(executionController), pager); } - private class Pager extends Transformation - public UnfilteredPartitionIterator fetchPageUnfiltered(CFMetaData cfm, int pageSize, ReadOrderGroup orderGroup) ++ public UnfilteredPartitionIterator fetchPageUnfiltered(CFMetaData cfm, int pageSize, ReadExecutionController executionController) + { + if (isExhausted()) + return EmptyIterators.unfilteredPartition(cfm, false); + + pageSize = Math.min(pageSize, remaining); + UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), command.nowInSec()); + - return Transformation.apply(nextPageReadCommand(pageSize).executeLocally(orderGroup), pager); ++ return Transformation.apply(nextPageReadCommand(pageSize).executeLocally(executionController), pager); + } + + private class UnfilteredPager extends Pager + { + + private UnfilteredPager(DataLimits pageLimits, int nowInSec) + { + super(pageLimits, nowInSec); + } + + protected BaseRowIterator apply(BaseRowIterator partition) + { + return Transformation.apply(counter.applyTo((UnfilteredRowIterator) partition), this); + } + } + + private class RowPager extends Pager + { + + private RowPager(DataLimits pageLimits, int nowInSec) + { + super(pageLimits, nowInSec); + } + + protected BaseRowIterator apply(BaseRowIterator partition) + { + return Transformation.apply(counter.applyTo((RowIterator) partition), this); + } + } + + private abstract class Pager extends Transformation> { private final DataLimits pageLimits; - private final DataLimits.Counter counter; + protected final DataLimits.Counter counter; + private DecoratedKey currentKey; private Row lastRow; private boolean isFirstPartition = true; @@@ -93,9 -131,12 +131,9 @@@ } @Override - public RowIterator applyToPartition(RowIterator partition) + public BaseRowIterator applyToPartition(BaseRowIterator partition) { - DecoratedKey key = partition.partitionKey(); - if (lastKey == null || !lastKey.equals(key)) - remainingInPartition = limits.perPartitionCount(); - lastKey = key; + currentKey = partition.partitionKey(); // If this is the first partition of this page, this could be the continuation of a partition we've started // on the previous page. In which case, we could have the problem that the partition has no more "regular" http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3d26b6a/test/unit/org/apache/cassandra/index/CustomIndexTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3d26b6a/test/unit/org/apache/cassandra/index/StubIndex.java ----------------------------------------------------------------------