Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D5B0018DE6 for ; Tue, 25 Aug 2015 17:07:36 +0000 (UTC) Received: (qmail 46399 invoked by uid 500); 25 Aug 2015 17:07:36 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 46309 invoked by uid 500); 25 Aug 2015 17:07:36 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 46290 invoked by uid 99); 25 Aug 2015 17:07:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Aug 2015 17:07:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 790FCE0260; Tue, 25 Aug 2015 17:07:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: marcuse@apache.org To: commits@cassandra.apache.org Date: Tue, 25 Aug 2015 17:07:37 -0000 Message-Id: In-Reply-To: <0eec04edc0e641d69346c3b32316c3d2@git.apache.org> References: <0eec04edc0e641d69346c3b32316c3d2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0 Merge branch 'cassandra-2.2' into cassandra-3.0 Conflicts: src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java src/java/org/apache/cassandra/repair/messages/PrepareMessage.java src/java/org/apache/cassandra/service/ActiveRepairService.java test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f490ccec Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f490ccec Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f490ccec Branch: refs/heads/cassandra-3.0 Commit: f490ccec62b2b86b9dbf0fff3888852aedbc7f65 Parents: 8afc76a 842f150 Author: Marcus Eriksson Authored: Tue Aug 25 19:05:23 2015 +0200 Committer: Marcus Eriksson Committed: Tue Aug 25 19:05:34 2015 +0200 ---------------------------------------------------------------------- .../repair/RepairMessageVerbHandler.java | 5 +- .../apache/cassandra/repair/RepairRunnable.java | 2 +- .../repair/messages/PrepareMessage.java | 10 +++- .../cassandra/repair/messages/RepairOption.java | 4 ++ .../cassandra/service/ActiveRepairService.java | 49 +++++++++++++------- .../LeveledCompactionStrategyTest.java | 2 +- .../cassandra/repair/LocalSyncTaskTest.java | 2 +- 7 files changed, 51 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 942d902,796f135..ffba9d6 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@@ -73,7 -82,7 +73,8 @@@ public class RepairMessageVerbHandler i columnFamilyStores, prepareMessage.ranges, prepareMessage.isIncremental, - prepareMessage.timestamp); - isGlobal); ++ prepareMessage.timestamp, ++ prepareMessage.isGlobal); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/messages/PrepareMessage.java index 0cd73db,a57c27e..8909f1b --- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java @@@ -40,16 -40,14 +40,18 @@@ public class PrepareMessage extends Rep public final UUID parentRepairSession; public final boolean isIncremental; + public final long timestamp; ++ public final boolean isGlobal; - public PrepareMessage(UUID parentRepairSession, List cfIds, Collection> ranges, boolean isIncremental, long timestamp) - public PrepareMessage(UUID parentRepairSession, List cfIds, Collection> ranges, boolean isIncremental, boolean isGlobal) ++ public PrepareMessage(UUID parentRepairSession, List cfIds, Collection> ranges, boolean isIncremental, long timestamp, boolean isGlobal) { - super(isGlobal ? Type.PREPARE_GLOBAL_MESSAGE : Type.PREPARE_MESSAGE, null); + super(Type.PREPARE_MESSAGE, null); this.parentRepairSession = parentRepairSession; this.cfIds = cfIds; this.ranges = ranges; this.isIncremental = isIncremental; + this.timestamp = timestamp; ++ this.isGlobal = isGlobal; } public static class PrepareMessageSerializer implements MessageSerializer @@@ -67,10 -65,9 +69,11 @@@ Range.tokenSerializer.serialize(r, out, version); } out.writeBoolean(message.isIncremental); + out.writeLong(message.timestamp); ++ out.writeBoolean(message.isGlobal); } - public PrepareMessage deserialize(DataInput in, int version) throws IOException + public PrepareMessage deserialize(DataInputPlus in, int version) throws IOException { int cfIdCount = in.readInt(); List cfIds = new ArrayList<>(cfIdCount); @@@ -82,8 -79,8 +85,9 @@@ for (int i = 0; i < rangeCount; i++) ranges.add((Range) Range.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version)); boolean isIncremental = in.readBoolean(); - - return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental, false); + long timestamp = in.readLong(); - return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental, timestamp); ++ boolean isGlobal = in.readBoolean(); ++ return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental, timestamp, isGlobal); } public long serializedSize(PrepareMessage message, int version) @@@ -93,11 -91,10 +97,12 @@@ for (UUID cfId : message.cfIds) size += UUIDSerializer.serializer.serializedSize(cfId, version); size += UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version); - size += sizes.sizeof(message.ranges.size()); + size += TypeSizes.sizeof(message.ranges.size()); for (Range r : message.ranges) size += Range.tokenSerializer.serializedSize(r, version); - size += sizes.sizeof(message.isIncremental); + size += TypeSizes.sizeof(message.isIncremental); + size += TypeSizes.sizeof(message.timestamp); ++ size += TypeSizes.sizeof(message.isGlobal); return size; } } @@@ -110,7 -107,6 +115,8 @@@ ", ranges=" + ranges + ", parentRepairSession=" + parentRepairSession + ", isIncremental="+isIncremental + + ", timestamp=" + timestamp + ++ ", isGlobal=" + isGlobal + '}'; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index 0e09cf7,a6389ea..8079b3a --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -233,8 -237,7 +237,8 @@@ public class ActiveRepairServic public synchronized UUID prepareForRepair(UUID parentRepairSession, Set endpoints, RepairOption options, List columnFamilyStores) { - registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental(), options.isGlobal()); + long timestamp = System.currentTimeMillis(); - registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental(), timestamp); ++ registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal()); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); final Set failedNodes = Collections.synchronizedSet(new HashSet()); @@@ -264,7 -267,10 +268,7 @@@ for (InetAddress neighbour : endpoints) { - PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), timestamp); - CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbour); - boolean isGlobal = options.isGlobal() && peerVersion != null && peerVersion.compareTo(SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) >= 0; - logger.debug("Sending prepare message: options.isGlobal = {}, peerVersion = {}", options.isGlobal(), peerVersion); - PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), isGlobal); ++ PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal()); MessageOut msg = message.createMessage(); MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true); } @@@ -287,9 -293,9 +291,9 @@@ return parentRepairSession; } - public void registerParentRepairSession(UUID parentRepairSession, List columnFamilyStores, Collection> ranges, boolean isIncremental, long timestamp) - public void registerParentRepairSession(UUID parentRepairSession, List columnFamilyStores, Collection> ranges, boolean isIncremental, boolean isGlobal) ++ public void registerParentRepairSession(UUID parentRepairSession, List columnFamilyStores, Collection> ranges, boolean isIncremental, long timestamp, boolean isGlobal) { - parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, timestamp)); - parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, isGlobal, System.currentTimeMillis())); ++ parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, timestamp, isGlobal)); } public Set currentlyRepairing(UUID cfId, UUID parentRepairSession) @@@ -401,16 -413,18 +411,18 @@@ private final Map columnFamilyStores = new HashMap<>(); private final Collection> ranges; private final Map> sstableMap = new HashMap<>(); - public final long repairedAt; + private final long repairedAt; public final boolean isIncremental; + private final boolean isGlobal; - public ParentRepairSession(List columnFamilyStores, Collection> ranges, boolean isIncremental, long repairedAt) - public ParentRepairSession(List columnFamilyStores, Collection> ranges, boolean isIncremental, boolean isGlobal, long repairedAt) ++ public ParentRepairSession(List columnFamilyStores, Collection> ranges, boolean isIncremental, long repairedAt, boolean isGlobal) { for (ColumnFamilyStore cfs : columnFamilyStores) this.columnFamilyStores.put(cfs.metadata.cfId, cfs); this.ranges = ranges; this.repairedAt = repairedAt; - this.isGlobal = isGlobal; this.isIncremental = isIncremental; ++ this.isGlobal = isGlobal; } public void addSSTables(UUID cfId, Set sstables) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 456dcd1,63fd0e7..8050b6c --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@@ -183,10 -195,10 +183,10 @@@ public class LeveledCompactionStrategyT assertTrue(strategy.getSSTableCountPerLevel()[2] > 0); Range range = new Range<>(Util.token(""), Util.token("")); - int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(System.currentTimeMillis()); + int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds()); UUID parentRepSession = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis()); - ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false, true); - RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range); ++ ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis(), true); + RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range)); Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore); CompactionManager.instance.submitValidation(cfs, validator).get(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index ff5b99e,e5c03b9..eec29bc --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@@ -89,13 -89,12 +89,13 @@@ public class LocalSyncTaskTest extends Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis()); - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false, false); ++ ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis(), false); + + RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); - RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", range); + MerkleTrees tree1 = createInitialTree(desc); - MerkleTree tree1 = createInitialTree(desc); - MerkleTree tree2 = createInitialTree(desc); + MerkleTrees tree2 = createInitialTree(desc); // change a range in one of the trees Token token = partirioner.midpoint(range.left, range.right);