Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 21F01DE59 for ; Wed, 20 Jun 2012 02:12:41 +0000 (UTC) Received: (qmail 9080 invoked by uid 500); 20 Jun 2012 02:12:41 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 9034 invoked by uid 500); 20 Jun 2012 02:12:41 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 9024 invoked by uid 99); 20 Jun 2012 02:12:40 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jun 2012 02:12:40 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A2D3812836; Wed, 20 Jun 2012 02:12:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: brandonwilliams@apache.org To: commits@cassandra.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: Gossip protocol version, use it to determine if new host id should be used. Patch by brandonwilliams, reviewed by Vijay for CASSANDRA-4317 Message-Id: <20120620021240.A2D3812836@tyr.zones.apache.org> Date: Wed, 20 Jun 2012 02:12:40 +0000 (UTC) Updated Branches: refs/heads/trunk bbcbfd865 -> e6530cc37 Gossip protocol version, use it to determine if new host id should be used. Patch by brandonwilliams, reviewed by Vijay for CASSANDRA-4317 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e6530cc3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e6530cc3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e6530cc3 Branch: refs/heads/trunk Commit: e6530cc3723a7d2fdc84400bf8cd722474eed589 Parents: bbcbfd8 Author: Brandon Williams Authored: Tue Jun 19 21:10:38 2012 -0500 Committer: Brandon Williams Committed: Tue Jun 19 21:10:38 2012 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/gms/ApplicationState.java | 1 + .../org/apache/cassandra/gms/VersionedValue.java | 7 ++++ .../org/apache/cassandra/net/MessagingService.java | 5 +++ .../apache/cassandra/service/StorageService.java | 26 ++++++++++++-- 4 files changed, 35 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/gms/ApplicationState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java b/src/java/org/apache/cassandra/gms/ApplicationState.java index 4520426..518aa80 100644 --- a/src/java/org/apache/cassandra/gms/ApplicationState.java +++ b/src/java/org/apache/cassandra/gms/ApplicationState.java @@ -29,6 +29,7 @@ public enum ApplicationState INTERNAL_IP, RPC_ADDRESS, SEVERITY, + NET_VERSION, // pad to allow adding new states to existing cluster X1, X2, http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index 61bcbe5..ff41fcb 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -64,6 +64,8 @@ public class VersionedValue implements Comparable // values for ApplicationState.REMOVAL_COORDINATOR public final static String REMOVAL_COORDINATOR = "REMOVER"; + // network proto version from MS + public final static String NET_VERSION = "NET_VERSION"; public final int version; public final String value; @@ -184,6 +186,11 @@ public class VersionedValue implements Comparable { return new VersionedValue(FBUtilities.getReleaseVersionString()); } + + public VersionedValue networkVersion() + { + return new VersionedValue(String.valueOf(MessagingService.current_version)); + } public VersionedValue internalIP(String private_ip) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 9c92402..c0af377 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -798,6 +798,11 @@ public final class MessagingService implements MessagingServiceMBean return getVersion(InetAddress.getByName(address)); } + public boolean knowsVersion(InetAddress endpoint) + { + return versions.get(endpoint) != null; + } + public void incrementDroppedMessages(Verb verb) { assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 0455b1d..012f2ec 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -318,6 +318,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe setMode(Mode.CLIENT, false); Gossiper.instance.register(this); Gossiper.instance.start((int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering. + Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion()); MessagingService.instance().listen(FBUtilities.getLocalAddress()); // sleep a while to allow gossip to warm up (the other nodes need to know about this one before they can reply). @@ -465,6 +466,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe Gossiper.instance.register(this); Gossiper.instance.register(migrationManager); Gossiper.instance.start(SystemTable.incrementAndGetGeneration()); // needed for node-ring gathering. + // gossip network proto version + Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion()); // gossip schema version when gossiper is running Schema.instance.updateVersionAndAnnounce(); // add rpc listening info @@ -1005,6 +1008,21 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe } /** + * Checks MS for the version, provided MS _really_ knows it (has directly communicated with the node) otherwise falls back to checking the gossipped version (learned about this node indirectly) + * If both fail, the node is too old to use hostid-style status serialization + * @param endpoint + * @return boolean whether or not to use hostid + */ + private boolean usesHostId(InetAddress endpoint) + { + if (MessagingService.instance().knowsVersion(endpoint) && MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12) + return true; + else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION) != null && Integer.valueOf(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION).value) >= MessagingService.VERSION_12) + return true; + return false; + } + + /** * Handle node bootstrap * * @param endpoint bootstrapping node @@ -1018,7 +1036,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe // versions < 1.2 .....: STATUS,TOKEN // versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,... int tokenPos; - if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12) + if (usesHostId(endpoint)) { assert pieces.length >= 3; tokenPos = 2; @@ -1048,7 +1066,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe tokenMetadata.addBootstrapToken(token, endpoint); calculatePendingRanges(); - if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12) + if (usesHostId(endpoint)) tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint); } @@ -1067,7 +1085,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe // versions < 1.2 .....: STATUS,TOKEN // versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,... int tokensPos; - if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12) + if (usesHostId(endpoint)) { assert pieces.length >= 3; tokensPos = 2; @@ -1084,7 +1102,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe logger.info("Node " + endpoint + " state jump to normal"); // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300). - if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12) + if (usesHostId(endpoint)) tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint); // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.