Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D88DD181B2 for ; Tue, 1 Dec 2015 19:08:41 +0000 (UTC) Received: (qmail 58295 invoked by uid 500); 1 Dec 2015 19:08:24 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 58225 invoked by uid 500); 1 Dec 2015 19:08:24 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 57832 invoked by uid 99); 1 Dec 2015 19:08:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Dec 2015 19:08:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4E2F2E0498; Tue, 1 Dec 2015 19:08:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Tue, 01 Dec 2015 19:08:34 -0000 Message-Id: In-Reply-To: <8115145cc6b94f119ca223ed65ff6b95@git.apache.org> References: <8115145cc6b94f119ca223ed65ff6b95@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/15] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0 Merge branch 'cassandra-2.2' into cassandra-3.0 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ccb20ad4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ccb20ad4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ccb20ad4 Branch: refs/heads/cassandra-3.1 Commit: ccb20ad46ab38961aac39cc8634f450046bdf16b Parents: 803a3d9 2491ede Author: Yuki Morishita Authored: Tue Dec 1 13:07:39 2015 -0600 Committer: Yuki Morishita Committed: Tue Dec 1 13:07:39 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/StreamReceiveTask.java | 60 ++++++++++---------- 2 files changed, 31 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb20ad4/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 7fffbbf,7541212..a01011b --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -23,7 -8,16 +23,8 @@@ Merged from 2.2 * Fix SimpleDateType type compatibility (CASSANDRA-10027) * (Hadoop) fix splits calculation (CASSANDRA-10640) * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) - * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) - * Use most up-to-date version of schema for system tables (CASSANDRA-10652) - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) - * Expose phi values from failure detector via JMX and tweak debug - and trace logging (CASSANDRA-9526) - * Fix RangeNamesQueryPager (CASSANDRA-10509) - * Deprecate Pig support (CASSANDRA-10542) - * Reduce contention getting instances of CompositeType (CASSANDRA-10433) Merged from 2.1: + * Add proper error handling to stream receiver (CASSANDRA-10774) * Warn or fail when changing cluster topology live (CASSANDRA-10243) * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213) * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb20ad4/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 54ce711,dd56b8b..dfc91f9 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@@ -126,110 -113,73 +126,110 @@@ public class StreamReceiveTask extends public void run() { - Pair kscf = Schema.instance.getCF(task.cfId); - if (kscf == null) - { - // schema was dropped during streaming - task.sstables.forEach(SSTableMultiWriter::abortOrDie); - - task.sstables.clear(); - task.txn.abort(); - return; - } - ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - boolean hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right)); - ++ boolean hasViews = false; ++ ColumnFamilyStore cfs = null; try { + Pair kscf = Schema.instance.getCF(task.cfId); + if (kscf == null) + { + // schema was dropped during streaming - for (SSTableWriter writer : task.sstables) - writer.abort(); ++ task.sstables.forEach(SSTableMultiWriter::abortOrDie); + task.sstables.clear(); ++ task.txn.abort(); + task.session.taskCompleted(task); + return; + } - ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); ++ cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); ++ hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right)); + - File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L); - if (lockfiledir == null) - throw new IOError(new IOException("All disks full")); - StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID()); - lockfile.create(task.sstables); List readers = new ArrayList<>(); - for (SSTableWriter writer : task.sstables) - readers.add(writer.finish(true)); - lockfile.delete(); + for (SSTableMultiWriter writer : task.sstables) + { + Collection newReaders = writer.finish(true); + readers.addAll(newReaders); + task.txn.update(newReaders, false); + } + task.sstables.clear(); try (Refs refs = Refs.ref(readers)) { - // add sstables and build secondary indexes - cfs.addSSTables(readers); - cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames()); - - //invalidate row and counter cache - if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter()) + //We have a special path for views. + //Since the view requires cleaning up any pre-existing state, we must put + //all partitions through the same write path as normal mutations. + //This also ensures any 2is are also updated + if (hasViews) { - List> boundsToInvalidate = new ArrayList<>(readers.size()); - for (SSTableReader sstable : readers) - boundsToInvalidate.add(new Bounds(sstable.first.getToken(), sstable.last.getToken())); - Set> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate); - - if (cfs.isRowCacheEnabled()) + for (SSTableReader reader : readers) { - int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds); - if (invalidatedKeys > 0) - logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " + - "receive task completed.", task.session.planId(), invalidatedKeys, - cfs.keyspace.getName(), cfs.getColumnFamilyName()); + try (ISSTableScanner scanner = reader.getScanner()) + { + while (scanner.hasNext()) + { + try (UnfilteredRowIterator rowIterator = scanner.next()) + { + //Apply unsafe (we will flush below before transaction is done) + new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe(); + } + } + } } + } + else + { + task.txn.finish(); - if (cfs.metadata.isCounter()) + // add sstables and build secondary indexes + cfs.addSSTables(readers); + cfs.indexManager.buildAllIndexesBlocking(readers); + + //invalidate row and counter cache + if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter()) { - int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds); - if (invalidatedKeys > 0) - logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " + - "receive task completed.", task.session.planId(), invalidatedKeys, - cfs.keyspace.getName(), cfs.getColumnFamilyName()); + List> boundsToInvalidate = new ArrayList<>(readers.size()); + readers.forEach(sstable -> boundsToInvalidate.add(new Bounds(sstable.first.getToken(), sstable.last.getToken()))); + Set> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate); + + if (cfs.isRowCacheEnabled()) + { + int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds); + if (invalidatedKeys > 0) + logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " + + "receive task completed.", task.session.planId(), invalidatedKeys, + cfs.keyspace.getName(), cfs.getTableName()); + } + + if (cfs.metadata.isCounter()) + { + int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds); + if (invalidatedKeys > 0) + logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " + + "receive task completed.", task.session.planId(), invalidatedKeys, + cfs.keyspace.getName(), cfs.getTableName()); + } } - } - } - catch (Throwable t) - { - logger.error("Error applying streamed sstable: ", t); - - JVMStabilityInspector.inspectThrowable(t); - } - finally - { - //We don't keep the streamed sstables since we've applied them manually - //So we abort the txn and delete the streamed sstables - if (hasViews) - { - cfs.forceBlockingFlush(); - task.txn.abort(); ++ task.session.taskCompleted(task); } } - - task.session.taskCompleted(task); } + catch (Throwable t) + { + logger.error("Error applying streamed data: ", t); + JVMStabilityInspector.inspectThrowable(t); + task.session.onError(t); + } + finally + { - task.session.taskCompleted(task); ++ //We don't keep the streamed sstables since we've applied them manually ++ //So we abort the txn and delete the streamed sstables ++ if (hasViews) ++ { ++ if (cfs != null) ++ cfs.forceBlockingFlush(); ++ task.txn.abort(); ++ } + } } }