Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id ECD3C200B36 for ; Wed, 6 Jul 2016 20:32:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EB94B160A55; Wed, 6 Jul 2016 18:32:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A0F7B160A73 for ; Wed, 6 Jul 2016 20:32:42 +0200 (CEST) Received: (qmail 71947 invoked by uid 500); 6 Jul 2016 18:32:40 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 71274 invoked by uid 99); 6 Jul 2016 18:32:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jul 2016 18:32:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A1834ED22B; Wed, 6 Jul 2016 18:32:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Wed, 06 Jul 2016 18:32:50 -0000 Message-Id: <621431fc03924de1986d76c8832db70c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [12/16] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0 archived-at: Wed, 06 Jul 2016 18:32:44 -0000 Merge branch 'cassandra-2.2' into cassandra-3.0 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/778f2a46 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/778f2a46 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/778f2a46 Branch: refs/heads/cassandra-3.9 Commit: 778f2a46e2df52aa8451aceaf17046e6b8c86ace Parents: 9ed3b42 00e7ecf Author: Yuki Morishita Authored: Wed Jul 6 12:33:54 2016 -0500 Committer: Yuki Morishita Committed: Wed Jul 6 12:33:54 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/ConnectionHandler.java | 8 ++-- .../cassandra/streaming/StreamReceiveTask.java | 50 +++++++++++++++----- .../cassandra/streaming/StreamSession.java | 17 +++++-- .../streaming/StreamingTransferTest.java | 30 ++++++++++-- 5 files changed, 83 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 02786c5,7d62f97..8118de1 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,26 -1,14 +1,27 @@@ -2.2.8 +3.0.9 + * Improve streaming synchronization and fault tolerance (CASSANDRA-11414) + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098) + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996) + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944) + * Fix column ordering of results with static columns for Thrift requests in + a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of + those static columns in query results (CASSANDRA-12123) + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090) + * Fix EOF exception when altering column type (CASSANDRA-11820) +Merged from 2.2: * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973) Merged from 2.1: - * Don't write shadowed range tombstone (CASSANDRA-12030) - * Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349) * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907) - * Account for partition deletions in tombstone histogram (CASSANDRA-12112) -2.2.7 +3.0.8 + * Fix potential race in schema during new table creation (CASSANDRA-12083) + * cqlsh: fix error handling in rare COPY FROM failure scenario (CASSANDRA-12070) + * Disable autocompaction during drain (CASSANDRA-11878) + * Add a metrics timer to MemtablePool and use it to track time spent blocked on memory in MemtableAllocator (CASSANDRA-11327) + * Fix upgrading schema with super columns with non-text subcomparators (CASSANDRA-12023) + * Add TimeWindowCompactionStrategy (CASSANDRA-9666) +Merged from 2.2: * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755) * Validate bloom_filter_fp_chance against lowest supported value when the table is created (CASSANDRA-11920) http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 6280f3a,b342edc..040906b --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@@ -17,9 -17,7 +17,6 @@@ */ package org.apache.cassandra.streaming; --import java.io.File; - import java.io.IOError; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@@ -36,19 -33,13 +33,20 @@@ import org.apache.cassandra.concurrent. import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.view.View; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Pair; - ++import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.Refs; /** @@@ -65,16 -55,11 +63,16 @@@ public class StreamReceiveTask extends // total size of files to receive private final long totalSize; + // Transaction tracking new files received - public final LifecycleTransaction txn; ++ private final LifecycleTransaction txn; + // true if task is done (either completed or aborted) -- private boolean done = false; ++ private volatile boolean done = false; // holds references to SSTables received - protected Collection sstables; + protected Collection sstables; + + private int remoteSSTablesReceived = 0; public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize) { @@@ -92,18 -74,16 +90,32 @@@ * * @param sstable SSTable file received. */ - public synchronized void received(SSTableWriter sstable) + public synchronized void received(SSTableMultiWriter sstable) { if (done) ++ { ++ logger.warn("[{}] Received sstable {} on already finished stream received task. Aborting sstable.", session.planId(), ++ sstable.getFilename()); ++ Throwables.maybeFail(sstable.abort(null)); return; ++ } + - assert cfId.equals(sstable.metadata.cfId); + remoteSSTablesReceived++; + assert cfId.equals(sstable.getCfId()); - Collection finished = sstable.finish(true); - sstables.add(sstable); ++ Collection finished = null; ++ try ++ { ++ finished = sstable.finish(true); ++ } ++ catch (Throwable t) ++ { ++ Throwables.maybeFail(sstable.abort(t)); ++ } + txn.update(finished, false); + sstables.addAll(finished); - if (sstables.size() == totalFiles) + if (remoteSSTablesReceived == totalFiles) { done = true; executor.submit(new OnCompletionRunnable(this)); @@@ -120,6 -100,6 +132,13 @@@ return totalSize; } ++ public synchronized LifecycleTransaction getTransaction() ++ { ++ if (done) ++ throw new RuntimeException(String.format("Stream receive task {} of cf {} already finished.", session.planId(), cfId)); ++ return txn; ++ } ++ private static class OnCompletionRunnable implements Runnable { private final StreamReceiveTask task; @@@ -139,71 -117,52 +158,71 @@@ if (kscf == null) { // schema was dropped during streaming - for (SSTableWriter writer : task.sstables) - writer.abort(); task.sstables.clear(); - task.txn.abort(); ++ task.abortTransaction(); + task.session.taskCompleted(task); return; } - ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right)); - File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L); - StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID()); - lockfile.create(task.sstables); - List readers = new ArrayList<>(); - for (SSTableWriter writer : task.sstables) - readers.add(writer.finish(true)); - lockfile.delete(); - task.sstables.clear(); + Collection readers = task.sstables; try (Refs refs = Refs.ref(readers)) { - // add sstables and build secondary indexes - cfs.addSSTables(readers); - cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames()); - - //invalidate row and counter cache - if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter()) + //We have a special path for views. + //Since the view requires cleaning up any pre-existing state, we must put + //all partitions through the same write path as normal mutations. + //This also ensures any 2is are also updated + if (hasViews) { - List> boundsToInvalidate = new ArrayList<>(readers.size()); - for (SSTableReader sstable : readers) - boundsToInvalidate.add(new Bounds(sstable.first.getToken(), sstable.last.getToken())); - Set> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate); - - if (cfs.isRowCacheEnabled()) + for (SSTableReader reader : readers) { - int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds); - if (invalidatedKeys > 0) - logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " + - "receive task completed.", task.session.planId(), invalidatedKeys, - cfs.keyspace.getName(), cfs.getColumnFamilyName()); + try (ISSTableScanner scanner = reader.getScanner()) + { + while (scanner.hasNext()) + { + try (UnfilteredRowIterator rowIterator = scanner.next()) + { + //Apply unsafe (we will flush below before transaction is done) + new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe(); + } + } + } } + } + else + { - task.txn.finish(); ++ task.finishTransaction(); - if (cfs.metadata.isCounter()) + // add sstables and build secondary indexes + cfs.addSSTables(readers); + cfs.indexManager.buildAllIndexesBlocking(readers); + + //invalidate row and counter cache + if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter()) { - int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds); - if (invalidatedKeys > 0) - logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " + - "receive task completed.", task.session.planId(), invalidatedKeys, - cfs.keyspace.getName(), cfs.getColumnFamilyName()); + List> boundsToInvalidate = new ArrayList<>(readers.size()); + readers.forEach(sstable -> boundsToInvalidate.add(new Bounds(sstable.first.getToken(), sstable.last.getToken()))); + Set> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate); + + if (cfs.isRowCacheEnabled()) + { + int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds); + if (invalidatedKeys > 0) + logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " + + "receive task completed.", task.session.planId(), invalidatedKeys, + cfs.keyspace.getName(), cfs.getTableName()); + } + + if (cfs.metadata.isCounter()) + { + int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds); + if (invalidatedKeys > 0) + logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " + + "receive task completed.", task.session.planId(), invalidatedKeys, + cfs.keyspace.getName(), cfs.getTableName()); + } } } } @@@ -211,21 -171,10 +230,20 @@@ } catch (Throwable t) { -- logger.error("Error applying streamed data: ", t); JVMStabilityInspector.inspectThrowable(t); task.session.onError(t); } + finally + { + //We don't keep the streamed sstables since we've applied them manually + //So we abort the txn and delete the streamed sstables + if (hasViews) + { + if (cfs != null) + cfs.forceBlockingFlush(); - task.txn.abort(); ++ task.abortTransaction(); + } + } } } @@@ -241,7 -190,8 +259,17 @@@ return; done = true; - txn.abort(); - for (SSTableWriter writer : sstables) - writer.abort(); ++ abortTransaction(); sstables.clear(); } ++ ++ private synchronized void abortTransaction() ++ { ++ txn.abort(); ++ } ++ ++ private synchronized void finishTransaction() ++ { ++ txn.finish(); ++ } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java index bfbedc7,f4c900e..12f561b --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@@ -211,12 -212,6 +211,12 @@@ public class StreamSession implements I } + public LifecycleTransaction getTransaction(UUID cfId) + { + assert receivers.containsKey(cfId); - return receivers.get(cfId).txn; ++ return receivers.get(cfId).getTransaction(); + } + /** * Bind this session to report to specific {@link StreamResultFuture} and * perform pre-streaming initialization. @@@ -281,8 -276,8 +281,9 @@@ * @param flushTables flush tables? * @param repairedAt the time the repair started. */ -- public void addTransferRanges(String keyspace, Collection> ranges, Collection columnFamilies, boolean flushTables, long repairedAt) ++ public synchronized void addTransferRanges(String keyspace, Collection> ranges, Collection columnFamilies, boolean flushTables, long repairedAt) { ++ failIfFinished(); Collection stores = getColumnFamilyStores(keyspace, columnFamilies); if (flushTables) flushSSTables(stores); @@@ -300,6 -295,6 +301,12 @@@ } } ++ private void failIfFinished() ++ { ++ if (state() == State.COMPLETE || state() == State.FAILED) ++ throw new RuntimeException(String.format("Stream %s is finished with state %s", planId(), state().name())); ++ } ++ private Collection getColumnFamilyStores(String keyspace, Collection columnFamilies) { Collection stores = new HashSet<>(); @@@ -371,8 -369,8 +378,9 @@@ } } -- public void addTransferFiles(Collection sstableDetails) ++ public synchronized void addTransferFiles(Collection sstableDetails) { ++ failIfFinished(); Iterator iter = sstableDetails.iterator(); while (iter.hasNext()) { @@@ -745,8 -743,8 +753,9 @@@ FBUtilities.waitOnFutures(flushes); } -- private void prepareReceiving(StreamSummary summary) ++ private synchronized void prepareReceiving(StreamSummary summary) { ++ failIfFinished(); if (summary.files > 0) receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 7223e76,2b16267..6be880c --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@@ -229,14 -238,14 +229,38 @@@ public class StreamingTransferTes List> ranges = new ArrayList<>(); // wrapped range ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0")))); -- new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()).execute().get(); ++ StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()); ++ streamPlan.execute().get(); verifyConnectionsAreClosed(); ++ ++ //cannot add ranges after stream session is finished ++ try ++ { ++ streamPlan.transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()); ++ fail("Should have thrown exception"); ++ } ++ catch (RuntimeException e) ++ { ++ //do nothing ++ } } private void transfer(SSTableReader sstable, List> ranges) throws Exception { -- new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))).execute().get(); ++ StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))); ++ streamPlan.execute().get(); verifyConnectionsAreClosed(); ++ ++ //cannot add files after stream session is finished ++ try ++ { ++ streamPlan.transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))); ++ fail("Should have thrown exception"); ++ } ++ catch (RuntimeException e) ++ { ++ //do nothing ++ } } /** @@@ -312,36 -325,27 +336,36 @@@ String cfname = "StandardInteger1"; Keyspace keyspace = Keyspace.open(ks); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); + ClusteringComparator comparator = cfs.getComparator(); - String key = "key0"; - Mutation rm = new Mutation(ks, ByteBufferUtil.bytes(key)); - // add columns of size slightly less than column_index_size to force insert column index - rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2); - rm.add(cfname, cellname(6), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]), 2); - ColumnFamily cf = rm.addOrGet(cfname); - // add RangeTombstones - cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); - cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); - cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); - rm.applyUnsafe(); + String key = "key1"; + + + RowUpdateBuilder updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key); - key = "key1"; - rm = new Mutation(ks, ByteBufferUtil.bytes(key)); // add columns of size slightly less than column_index_size to force insert column index - rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2); - cf = rm.addOrGet(cfname); + updates.clustering(1) + .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64])) + .build() + .apply(); + + updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key); + updates.clustering(6) + .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()])) - .build() ++ .build() + .apply(); + // add RangeTombstones - cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); - rm.applyUnsafe(); + //updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1 , key); + //updates.addRangeTombstone(Slice.make(comparator, comparator.make(2), comparator.make(4))) + // .build() + // .apply(); + + + updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1, key); + updates.addRangeTombstone(Slice.make(comparator.make(5), comparator.make(7))) + .build() + .apply(); cfs.forceBlockingFlush();