Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 07777200BFB for ; Tue, 6 Dec 2016 12:21:48 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 05E75160B29; Tue, 6 Dec 2016 11:21:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C4209160B2D for ; Tue, 6 Dec 2016 12:21:46 +0100 (CET) Received: (qmail 49583 invoked by uid 500); 6 Dec 2016 11:21:45 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 49024 invoked by uid 99); 6 Dec 2016 11:21:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Dec 2016 11:21:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4FD76F16B3; Tue, 6 Dec 2016 11:21:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: blambov@apache.org To: commits@cassandra.apache.org Date: Tue, 06 Dec 2016 11:21:49 -0000 Message-Id: <9bba7c563efa4ab39076ec005f4d11f6@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11 archived-at: Tue, 06 Dec 2016 11:21:48 -0000 Merge branch 'cassandra-3.0' into cassandra-3.11 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f268eda Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f268eda Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f268eda Branch: refs/heads/cassandra-3.X Commit: 2f268eda3d44f8b14b71b7f4b3f4c25e2dfb2c11 Parents: b207f2e 6f90e55 Author: Branimir Lambov Authored: Tue Dec 6 12:11:15 2016 +0200 Committer: Branimir Lambov Committed: Tue Dec 6 12:12:19 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 41 ++++++++------------ .../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++++ 3 files changed, 55 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index c5d2da2,5242adf..6da6b4f --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,113 -1,5 +1,114 @@@ -3.0.11 +3.10 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666) + * Remove timing window in test case (CASSANDRA-12875) + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945) + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919) + * Fix validation of non-frozen UDT cells (CASSANDRA-12916) + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903) + * Fix Murmur3PartitionerTest (CASSANDRA-12858) + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897) + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283) + * Fix cassandra-stress truncate option (CASSANDRA-12695) + * Fix crossNode value when receiving messages (CASSANDRA-12791) + * Don't load MX4J beans twice (CASSANDRA-12869) + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838) + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836) + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845) + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454) + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777) + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419) + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803) + * Use different build directories for Eclipse and Ant (CASSANDRA-12466) + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815) + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812) + * Upgrade commons-codec to 1.9 (CASSANDRA-12790) + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550) + * Add duration data type (CASSANDRA-11873) + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784) + * Improve sum aggregate functions (CASSANDRA-12417) + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761) + * cqlsh fails to format collections when using aliases (CASSANDRA-11534) + * Check for hash conflicts in prepared statements (CASSANDRA-12733) + * Exit query parsing upon first error (CASSANDRA-12598) + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729) + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450) + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199) + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461) + * Add hint delivery metrics (CASSANDRA-12693) + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731) + * ColumnIndex does not reuse buffer (CASSANDRA-12502) + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697) + * Upgrade metrics-reporter dependencies (CASSANDRA-12089) + * Tune compaction thread count via nodetool (CASSANDRA-12248) + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232) + * Include repair session IDs in repair start message (CASSANDRA-12532) + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039) + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667) + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318) + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647) + * Fix cassandra-stress graphing (CASSANDRA-12237) + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031) + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585) + * Add JMH benchmarks.jar (CASSANDRA-12586) + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567) + * Add keep-alive to streaming (CASSANDRA-11841) + * Tracing payload is passed through newSession(..) (CASSANDRA-11706) + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261) + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486) + * Retry all internode messages once after a connection is + closed and reopened (CASSANDRA-12192) + * Add support to rebuild from targeted replica (CASSANDRA-9875) + * Add sequence distribution type to cassandra stress (CASSANDRA-12490) + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154) + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474) + * Extend read/write failure messages with a map of replica addresses + to error codes in the v5 native protocol (CASSANDRA-12311) + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374) + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550) + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378) + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223) + * Added slow query log (CASSANDRA-12403) + * Count full coordinated request against timeout (CASSANDRA-12256) + * Allow TTL with null value on insert and update (CASSANDRA-12216) + * Make decommission operation resumable (CASSANDRA-12008) + * Add support to one-way targeted repair (CASSANDRA-9876) + * Remove clientutil jar (CASSANDRA-11635) + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717) + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358) + * Cassandra stress should dump all setting on startup (CASSANDRA-11914) + * Make it possible to compact a given token range (CASSANDRA-10643) + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179) + * Collect metrics on queries by consistency level (CASSANDRA-7384) + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707) + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228) + * Upgrade to OHC 0.4.4 (CASSANDRA-12133) + * Add version command to cassandra-stress (CASSANDRA-12258) + * Create compaction-stress tool (CASSANDRA-11844) + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019) + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142) + * Support filtering on non-PRIMARY KEY columns in the CREATE + MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368) + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004) + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174) + * Faster write path (CASSANDRA-12269) + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424) + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035) + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635) + * Prepend snapshot name with "truncated" or "dropped" when a snapshot + is taken before truncating or dropping a table (CASSANDRA-12178) + * Optimize RestrictionSet (CASSANDRA-12153) + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150) + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613) + * Create a system table to expose prepared statements (CASSANDRA-8831) + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970) + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580) + * Add supplied username to authentication error messages (CASSANDRA-12076) + * Remove pre-startup check for open JMX port (CASSANDRA-12074) + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738) + * Restore resumable hints delivery (CASSANDRA-11960) + * Properly report LWT contention (CASSANDRA-12626) +Merged from 3.0: + * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956) * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868) * Nodetool should use a more sane max heap size (CASSANDRA-12739) * LocalToken ensures token values are cloned on heap (CASSANDRA-12651) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index f46e6f7,113e10d..881fb00 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -963,37 -920,49 +963,19 @@@ public class ColumnFamilyStore implemen * Both synchronises custom secondary indexes and provides ordering guarantees for futures on switchMemtable/flush * etc, which expect to be able to wait until the flush (and all prior flushes) requested have completed. */ - private final class PostFlush implements Callable + private final class PostFlush implements Callable { -- final boolean flushSecondaryIndexes; -- final OpOrder.Barrier writeBarrier; - final CountDownLatch memtablesFlushLatch = new CountDownLatch(1); - final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1); - volatile Throwable flushFailure = null; + final CountDownLatch latch = new CountDownLatch(1); - volatile Throwable flushFailure = null; final List memtables; ++ volatile Throwable flushFailure = null; - private PostFlush(boolean flushSecondaryIndexes, - OpOrder.Barrier writeBarrier, - private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, -- List memtables) ++ private PostFlush(List memtables) { -- this.writeBarrier = writeBarrier; -- this.flushSecondaryIndexes = flushSecondaryIndexes; this.memtables = memtables; } - public ReplayPosition call() + public CommitLogPosition call() { -- writeBarrier.await(); -- -- /** -- * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the -- * flushed memtables and CL position, which is as good as we can guarantee. -- * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly -- * with CL as we do with memtables/CFS-backed SecondaryIndexes. -- */ - - if (flushSecondaryIndexes) - indexManager.flushAllNonCFSBackedIndexesBlocking(); - try - { - if (flushSecondaryIndexes) - { - indexManager.flushAllNonCFSBackedIndexesBlocking(); - } - } - catch (Throwable e) - { - flushFailure = merge(flushFailure, e); - } - finally - { - secondaryIndexFlushLatch.countDown(); - } -- try { // we wait on the latch for the commitLogUpperBound to be set, and so that waiters @@@ -1075,10 -1043,9 +1057,10 @@@ // we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete; // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier - // replay positions have also completed, i.e. the memtables are done and ready to flush + // commit log segment position have also completed, i.e. the memtables are done and ready to flush writeBarrier.issue(); -- postFlush = new PostFlush(!truncate, writeBarrier, memtables); ++ postFlush = new PostFlush(memtables); + postFlushTask = ListenableFutureTask.create(postFlush); } public void run() @@@ -1096,19 -1063,21 +1078,21 @@@ try { -- for (Memtable memtable : memtables) -- flushMemtable(memtable); ++ // Flush "data" memtable with non-cf 2i first; ++ flushMemtable(memtables.get(0), true); ++ for (int i = 1; i < memtables.size(); i++) ++ flushMemtable(memtables.get(i), false); } - catch (Throwable e) + catch (Throwable t) { - JVMStabilityInspector.inspectThrowable(e); - // If we weren't killed, try to continue work but do not allow CommitLog to be discarded. - postFlush.flushFailure = e; + JVMStabilityInspector.inspectThrowable(t); + postFlush.flushFailure = t; } - // signal the post-flush we've done our work - postFlush.memtablesFlushLatch.countDown(); + postFlush.latch.countDown(); } -- public Collection flushMemtable(Memtable memtable) ++ public Collection flushMemtable(Memtable memtable, boolean flushNonCf2i) { if (memtable.isClean() || truncate) { @@@ -1117,93 -1086,28 +1101,102 @@@ return Collections.emptyList(); } - Collection readers = Collections.emptyList(); - try (SSTableTxnWriter writer = memtable.flush()) + List> futures = new ArrayList<>(); + long totalBytesOnDisk = 0; + long maxBytesOnDisk = 0; + long minBytesOnDisk = Long.MAX_VALUE; + List sstables = new ArrayList<>(); + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH)) { + List flushRunnables = null; + List flushResults = null; + try { - postFlush.secondaryIndexFlushLatch.await(); + // flush the memtable + flushRunnables = memtable.flushRunnables(txn); + + for (int i = 0; i < flushRunnables.size(); i++) + futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i))); + ++ /** ++ * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the ++ * flushed memtables and CL position, which is as good as we can guarantee. ++ * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly ++ * with CL as we do with memtables/CFS-backed SecondaryIndexes. ++ */ ++ if (flushNonCf2i) ++ indexManager.flushAllNonCFSBackedIndexesBlocking(); ++ + flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures)); } - catch (InterruptedException e) + catch (Throwable t) { - postFlush.flushFailure = merge(postFlush.flushFailure, e); + t = memtable.abortRunnables(flushRunnables, t); + t = txn.abort(t); + throw Throwables.propagate(t); } - if (postFlush.flushFailure == null && writer.getFilePointer() > 0) - // sstables should contain non-repaired data. - readers = writer.finish(true); - else - maybeFail(writer.abort(postFlush.flushFailure)); - } + try + { + Iterator writerIterator = flushResults.iterator(); + while (writerIterator.hasNext()) + { + @SuppressWarnings("resource") + SSTableMultiWriter writer = writerIterator.next(); + if (writer.getFilePointer() > 0) + { + writer.setOpenResult(true).prepareToCommit(); + } + else + { + maybeFail(writer.abort(null)); + writerIterator.remove(); + } + } + } + catch (Throwable t) + { + for (SSTableMultiWriter writer : flushResults) + t = writer.abort(t); + t = txn.abort(t); + Throwables.propagate(t); + } + + txn.prepareToCommit(); + + Throwable accumulate = null; + for (SSTableMultiWriter writer : flushResults) + accumulate = writer.commit(accumulate); - memtable.cfs.replaceFlushed(memtable, readers); + maybeFail(txn.commit(accumulate)); + + for (SSTableMultiWriter writer : flushResults) + { + Collection flushedSSTables = writer.finished(); + for (SSTableReader sstable : flushedSSTables) + { + if (sstable != null) + { + sstables.add(sstable); + long size = sstable.bytesOnDisk(); + totalBytesOnDisk += size; + maxBytesOnDisk = Math.max(maxBytesOnDisk, size); + minBytesOnDisk = Math.min(minBytesOnDisk, size); + } + } + } + } + memtable.cfs.replaceFlushed(memtable, sstables); reclaim(memtable); - return readers; + memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables); + logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}", + sstables, + sstables.size(), + FBUtilities.prettyPrintMemory(totalBytesOnDisk), + FBUtilities.prettyPrintMemory(maxBytesOnDisk), + FBUtilities.prettyPrintMemory(minBytesOnDisk)); + return sstables; } private void reclaim(final Memtable memtable) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/test/unit/org/apache/cassandra/index/CustomIndexTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/index/CustomIndexTest.java index 8e1385e,b8e4185..4a43210 --- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java +++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java @@@ -624,6 -624,6 +624,43 @@@ public class CustomIndexTest extends CQ assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo")); } ++ @Test ++ public void testFailing2iFlush() throws Throwable ++ { ++ createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)"); ++ createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'"); ++ ++ for (int i = 0; i < 10; i++) ++ execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i); ++ ++ try ++ { ++ getCurrentColumnFamilyStore().forceBlockingFlush(); ++ fail("Exception should have been propagated"); ++ } ++ catch (Throwable t) ++ { ++ assertTrue(t.getMessage().contains("Broken2I")); ++ } ++ ++ // SSTables remain uncommitted. ++ assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length); ++ } ++ ++ // Used for index creation above ++ public static class BrokenCustom2I extends StubIndex ++ { ++ public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata) ++ { ++ super(baseCfs, metadata); ++ } ++ ++ public Callable getBlockingFlushTask() ++ { ++ throw new RuntimeException("Broken2I"); ++ } ++ } ++ private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable { createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",