Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DBDD810AF0 for ; Thu, 29 Aug 2013 15:20:27 +0000 (UTC) Received: (qmail 69128 invoked by uid 500); 29 Aug 2013 15:20:26 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 68736 invoked by uid 500); 29 Aug 2013 15:20:25 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 68682 invoked by uid 99); 29 Aug 2013 15:20:23 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Aug 2013 15:20:23 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A58918C6B8F; Thu, 29 Aug 2013 15:20:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Thu, 29 Aug 2013 15:20:23 -0000 Message-Id: <6182c7716172457e86e9950e3be80d8f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/6] git commit: Fix streaming does not transfer wrapped range Updated Branches: refs/heads/cassandra-1.2 254d315d5 -> 18be7fa87 refs/heads/cassandra-2.0 8d4b51d5a -> 9495eb59c refs/heads/trunk 48ea937b8 -> c146f437b Fix streaming does not transfer wrapped range patch by Sergio Bossa; reviewed by yukim for CASSANDRA-5948 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/18be7fa8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/18be7fa8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/18be7fa8 Branch: refs/heads/cassandra-1.2 Commit: 18be7fa8760c578a9d42f7512e7767992025ac18 Parents: 254d315 Author: Sergio Bossa Authored: Wed Aug 28 14:43:04 2013 -0500 Committer: Yuki Morishita Committed: Thu Aug 29 10:16:57 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/streaming/StreamOut.java | 9 +- .../streaming/StreamingTransferTest.java | 181 +++++++++++-------- 3 files changed, 112 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/18be7fa8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9ef0651..5cb1522 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,7 @@ * Fix CqlRecordWriter with composite keys (CASSANDRA-5949) * Allow disabling SlabAllocator (CASSANDRA-5935) * Make user-defined compaction JMX blocking (CASSANDRA-4952) + * Fix streaming does not transfer wrapped range (CASSANDRA-5948) 1.2.9 http://git-wip-us.apache.org/repos/asf/cassandra/blob/18be7fa8/src/java/org/apache/cassandra/streaming/StreamOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java index 5a5ab9a..7035ec7 100644 --- a/src/java/org/apache/cassandra/streaming/StreamOut.java +++ b/src/java/org/apache/cassandra/streaming/StreamOut.java @@ -126,8 +126,11 @@ public class StreamOut boolean flushTables) { assert ranges.size() > 0; + + List> normalizedRanges = Range.normalize(ranges); + logger.info("Beginning transfer to {}", session.getHost()); - logger.debug("Ranges are {}", StringUtils.join(ranges, ",")); + logger.debug("Ranges are {}", StringUtils.join(normalizedRanges, ",")); if (flushTables) flushSSTables(cfses); @@ -136,13 +139,13 @@ public class StreamOut for (ColumnFamilyStore cfStore : cfses) { List> rowBoundsList = Lists.newLinkedList(); - for (Range range : ranges) + for (Range range : normalizedRanges) rowBoundsList.add(range.toRowBounds()); ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList); sstables.addAll(view.sstables); } - transferSSTables(session, sstables, ranges, type); + transferSSTables(session, sstables, normalizedRanges, type); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/18be7fa8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 2befe45..82c6b1c 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -1,23 +1,21 @@ -package org.apache.cassandra.streaming; - /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; import static junit.framework.Assert.assertEquals; import org.apache.cassandra.OrderedJUnit4ClassRunner; @@ -73,7 +71,7 @@ public class StreamingTransferTest extends SchemaLoader * Create and transfer a single sstable, and return the keys that should have been transferred. * The Mutator must create the given column, but it may also create any other columns it pleases. */ - private List createAndTransfer(Table table, ColumnFamilyStore cfs, Mutator mutator) throws Exception + private List createAndTransfer(Table table, ColumnFamilyStore cfs, Mutator mutator, boolean transferSSTables) throws Exception { // write a temporary SSTable, and unregister it logger.debug("Mutating " + cfs.columnFamily); @@ -83,18 +81,29 @@ public class StreamingTransferTest extends SchemaLoader cfs.forceBlockingFlush(); Util.compactAll(cfs).get(); assertEquals(1, cfs.getSSTables().size()); - SSTableReader sstable = cfs.getSSTables().iterator().next(); - cfs.clearUnsafe(); // transfer the first and last key logger.debug("Transferring " + cfs.columnFamily); - transfer(table, sstable); + int[] offs; + if (transferSSTables) + { + SSTableReader sstable = cfs.getSSTables().iterator().next(); + cfs.clearUnsafe(); + transferSSTables(table, sstable); + offs = new int[]{1, 3}; + } + else + { + long beforeStreaming = System.currentTimeMillis(); + transferRanges(table, cfs); + cfs.discardSSTables(beforeStreaming); + offs = new int[]{2, 3}; + } // confirm that a single SSTable was transferred and registered assertEquals(1, cfs.getSSTables().size()); // and that the index and filter were properly recovered - int[] offs = new int[]{1, 3}; List rows = Util.getRangeSlice(cfs); assertEquals(offs.length, rows.size()); for (int i = 0; i < offs.length; i++) @@ -108,7 +117,6 @@ public class StreamingTransferTest extends SchemaLoader // and that the max timestamp for the file was rediscovered assertEquals(timestamp, cfs.getSSTables().iterator().next().getMaxTimestamp()); - List keys = new ArrayList(); for (int off : offs) keys.add("key" + off); @@ -117,17 +125,64 @@ public class StreamingTransferTest extends SchemaLoader return keys; } - private void transfer(Table table, SSTableReader sstable) throws Exception + private void transferRanges(Table table, ColumnFamilyStore cfs) throws Exception + { + IPartitioner p = StorageService.getPartitioner(); + List> ranges = new ArrayList>(); + ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0")))); + StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback) null); + StreamOut.transferRanges(session, Arrays.asList(cfs), ranges, OperationType.BOOTSTRAP); + session.await(); + } + + private void transferSSTables(Table table, SSTableReader sstable) throws Exception { IPartitioner p = StorageService.getPartitioner(); List> ranges = new ArrayList>(); ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1")))); ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken())); - StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback)null); + StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback) null); StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP); session.await(); } + private void doTransferTable(boolean transferSSTables) throws Exception + { + final Table table = Table.open("Keyspace1"); + final ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1"); + + List keys = createAndTransfer(table, cfs, new Mutator() + { + public void mutate(String key, String col, long timestamp) throws Exception + { + long val = key.hashCode(); + RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key)); + ColumnFamily cf = ColumnFamily.create(table.name, cfs.columnFamily); + cf.addColumn(column(col, "v", timestamp)); + cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(val), timestamp)); + rm.add(cf); + logger.debug("Applying row to transfer " + rm); + rm.apply(); + } + }, transferSSTables); + + // confirm that the secondary index was recovered + for (String key : keys) + { + long val = key.hashCode(); + IPartitioner p = StorageService.getPartitioner(); + IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), + IndexOperator.EQ, + ByteBufferUtil.bytes(val)); + List clause = Arrays.asList(expr); + IDiskAtomFilter filter = new IdentityQueryFilter(); + Range range = Util.range("", ""); + List rows = cfs.search(clause, range, 100, filter); + assertEquals(1, rows.size()); + assert rows.get(0).key.key.equals(ByteBufferUtil.bytes(key)); + } + } + /** * Test to make sure RangeTombstones at column index boundary transferred correctly. */ @@ -153,7 +208,7 @@ public class StreamingTransferTest extends SchemaLoader SSTableReader sstable = cfs.getSSTables().iterator().next(); cfs.clearUnsafe(); - transfer(table, sstable); + transferSSTables(table, sstable); // confirm that a single SSTable was transferred and registered assertEquals(1, cfs.getSSTables().size()); @@ -163,41 +218,15 @@ public class StreamingTransferTest extends SchemaLoader } @Test - public void testTransferTable() throws Exception + public void testTransferTableViaRanges() throws Exception { - final Table table = Table.open("Keyspace1"); - final ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1"); - - List keys = createAndTransfer(table, cfs, new Mutator() - { - public void mutate(String key, String col, long timestamp) throws Exception - { - long val = key.hashCode(); - RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key)); - ColumnFamily cf = ColumnFamily.create(table.name, cfs.columnFamily); - cf.addColumn(column(col, "v", timestamp)); - cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(val), timestamp)); - rm.add(cf); - logger.debug("Applying row to transfer " + rm); - rm.apply(); - } - }); + doTransferTable(false); + } - // confirm that the secondary index was recovered - for (String key : keys) - { - long val = key.hashCode(); - IPartitioner p = StorageService.getPartitioner(); - IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), - IndexOperator.EQ, - ByteBufferUtil.bytes(val)); - List clause = Arrays.asList(expr); - IDiskAtomFilter filter = new IdentityQueryFilter(); - Range range = Util.range("", ""); - List rows = cfs.search(clause, range, 100, filter); - assertEquals(1, rows.size()); - assert rows.get(0).key.key.equals(ByteBufferUtil.bytes(key)); - } + @Test + public void testTransferTableViaSSTables() throws Exception + { + doTransferTable(true); } @Test @@ -214,7 +243,7 @@ public class StreamingTransferTest extends SchemaLoader addMutation(rm, cfs.columnFamily, col, 1, "val1", timestamp); rm.apply(); } - }); + }, true); } @Test @@ -240,35 +269,35 @@ public class StreamingTransferTest extends SchemaLoader state.writeElement(CounterId.fromInt(6), 3L, 3L); state.writeElement(CounterId.fromInt(8), 2L, 4L); cf.addColumn(new CounterColumn(ByteBufferUtil.bytes(col), - state.context, - timestamp)); + state.context, + timestamp)); cfCleaned.addColumn(new CounterColumn(ByteBufferUtil.bytes(col), - cc.clearAllDelta(state.context), - timestamp)); + cc.clearAllDelta(state.context), + timestamp)); entries.put(key, cf); cleanedEntries.put(key, cfCleaned); cfs.addSSTable(SSTableUtils.prepare() - .ks(table.name) - .cf(cfs.columnFamily) - .generation(0) - .write(entries)); + .ks(table.name) + .cf(cfs.columnFamily) + .generation(0) + .write(entries)); } - }); + }, true); // filter pre-cleaned entries locally, and ensure that the end result is equal cleanedEntries.keySet().retainAll(keys); SSTableReader cleaned = SSTableUtils.prepare() - .ks(table.name) - .cf(cfs.columnFamily) - .generation(0) - .write(cleanedEntries); + .ks(table.name) + .cf(cfs.columnFamily) + .generation(0) + .write(cleanedEntries); SSTableReader streamed = cfs.getSSTables().iterator().next(); SSTableUtils.assertContentEquals(cleaned, streamed); // Retransfer the file, making sure it is now idempotent (see CASSANDRA-3481) cfs.clearUnsafe(); - transfer(table, streamed); + transferSSTables(table, streamed); SSTableReader restreamed = cfs.getSSTables().iterator().next(); SSTableUtils.assertContentEquals(streamed, restreamed); }