Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2071C188C7 for ; Wed, 11 Nov 2015 20:08:50 +0000 (UTC) Received: (qmail 77402 invoked by uid 500); 11 Nov 2015 20:08:48 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 77265 invoked by uid 500); 11 Nov 2015 20:08:48 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 75918 invoked by uid 99); 11 Nov 2015 20:08:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Nov 2015 20:08:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5343DE0C60; Wed, 11 Nov 2015 20:08:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jmckenzie@apache.org To: commits@cassandra.apache.org Date: Wed, 11 Nov 2015 20:08:51 -0000 Message-Id: In-Reply-To: <328ecc1c6cd94d2eb02a6d0b73911cb1@git.apache.org> References: <328ecc1c6cd94d2eb02a6d0b73911cb1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/22] cassandra git commit: Fix NPE in Gossip handleStateNormal Fix NPE in Gossip handleStateNormal Patch by stefania; reviewed by jknighton for CASSANDRA-10089 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6bad57fc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6bad57fc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6bad57fc Branch: refs/heads/cassandra-3.0 Commit: 6bad57fc3cf967838a220d8402db37ed9a5b3b4e Parents: 3674ad9 Author: Stefania Alborghetti Authored: Wed Nov 11 15:02:26 2015 -0500 Committer: Joshua McKenzie Committed: Wed Nov 11 15:02:26 2015 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/gms/EndpointState.java | 76 ++++++--- .../apache/cassandra/gms/FailureDetector.java | 7 +- src/java/org/apache/cassandra/gms/Gossiper.java | 47 +++--- .../apache/cassandra/gms/VersionedValue.java | 5 + .../cassandra/service/StorageService.java | 65 ++++---- .../apache/cassandra/gms/EndpointStateTest.java | 159 +++++++++++++++++++ .../cassandra/locator/CloudstackSnitchTest.java | 8 +- .../apache/cassandra/locator/EC2SnitchTest.java | 4 +- .../locator/GoogleCloudSnitchTest.java | 8 +- 9 files changed, 283 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bad57fc/src/java/org/apache/cassandra/gms/EndpointState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java index 1029374..3e29295 100644 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@ -18,7 +18,11 @@ package org.apache.cassandra.gms; import java.io.*; +import java.util.Collections; +import java.util.EnumMap; import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,8 +31,6 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; -import org.cliffc.high_scale_lib.NonBlockingHashMap; - /** * This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState * instance. Any state for a given endpoint can be retrieved from this instance. @@ -42,7 +44,7 @@ public class EndpointState public final static IVersionedSerializer serializer = new EndpointStateSerializer(); private volatile HeartBeatState hbState; - final Map applicationState = new NonBlockingHashMap(); + private final AtomicReference> applicationState; /* fields below do not get serialized */ private volatile long updateTimestamp; @@ -50,7 +52,13 @@ public class EndpointState EndpointState(HeartBeatState initialHbState) { + this(initialHbState, new EnumMap(ApplicationState.class)); + } + + EndpointState(HeartBeatState initialHbState, Map states) + { hbState = initialHbState; + applicationState = new AtomicReference>(new EnumMap<>(states)); updateTimestamp = System.nanoTime(); isAlive = true; } @@ -68,21 +76,37 @@ public class EndpointState public VersionedValue getApplicationState(ApplicationState key) { - return applicationState.get(key); + return applicationState.get().get(key); } - /** - * TODO replace this with operations that don't expose private state - */ - @Deprecated - public Map getApplicationStateMap() + public Set> states() + { + return applicationState.get().entrySet(); + } + + public void addApplicationState(ApplicationState key, VersionedValue value) { - return applicationState; + addApplicationStates(Collections.singletonMap(key, value)); } - void addApplicationState(ApplicationState key, VersionedValue value) + public void addApplicationStates(Map values) { - applicationState.put(key, value); + addApplicationStates(values.entrySet()); + } + + public void addApplicationStates(Set> values) + { + while (true) + { + Map orig = applicationState.get(); + Map copy = new EnumMap<>(orig); + + for (Map.Entry value : values) + copy.put(value.getKey(), value.getValue()); + + if (applicationState.compareAndSet(orig, copy)) + return; + } } /* getters and setters */ @@ -116,7 +140,7 @@ public class EndpointState public String toString() { - return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState; + return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get(); } } @@ -129,12 +153,12 @@ class EndpointStateSerializer implements IVersionedSerializer HeartBeatState.serializer.serialize(hbState, out, version); /* serialize the map of ApplicationState objects */ - int size = epState.applicationState.size(); - out.writeInt(size); - for (Map.Entry entry : epState.applicationState.entrySet()) + Set> states = epState.states(); + out.writeInt(states.size()); + for (Map.Entry state : states) { - VersionedValue value = entry.getValue(); - out.writeInt(entry.getKey().ordinal()); + VersionedValue value = state.getValue(); + out.writeInt(state.getKey().ordinal()); VersionedValue.serializer.serialize(value, out, version); } } @@ -142,26 +166,28 @@ class EndpointStateSerializer implements IVersionedSerializer public EndpointState deserialize(DataInput in, int version) throws IOException { HeartBeatState hbState = HeartBeatState.serializer.deserialize(in, version); - EndpointState epState = new EndpointState(hbState); int appStateSize = in.readInt(); + Map states = new EnumMap<>(ApplicationState.class); for (int i = 0; i < appStateSize; ++i) { int key = in.readInt(); VersionedValue value = VersionedValue.serializer.deserialize(in, version); - epState.addApplicationState(Gossiper.STATES[key], value); + states.put(Gossiper.STATES[key], value); } - return epState; + + return new EndpointState(hbState, states); } public long serializedSize(EndpointState epState, int version) { long size = HeartBeatState.serializer.serializedSize(epState.getHeartBeatState(), version); - size += TypeSizes.NATIVE.sizeof(epState.applicationState.size()); - for (Map.Entry entry : epState.applicationState.entrySet()) + Set> states = epState.states(); + size += TypeSizes.NATIVE.sizeof(states.size()); + for (Map.Entry state : states) { - VersionedValue value = entry.getValue(); - size += TypeSizes.NATIVE.sizeof(entry.getKey().ordinal()); + VersionedValue value = state.getValue(); + size += TypeSizes.NATIVE.sizeof(state.getKey().ordinal()); size += VersionedValue.serializer.serializedSize(value, version); } return size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bad57fc/src/java/org/apache/cassandra/gms/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index 541f716..b2fe9d3 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -160,15 +160,16 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean { sb.append(" generation:").append(endpointState.getHeartBeatState().getGeneration()).append("\n"); sb.append(" heartbeat:").append(endpointState.getHeartBeatState().getHeartBeatVersion()).append("\n"); - for (Map.Entry state : endpointState.applicationState.entrySet()) + for (Map.Entry state : endpointState.states()) { if (state.getKey() == ApplicationState.TOKENS) continue; sb.append(" ").append(state.getKey()).append(":").append(state.getValue().version).append(":").append(state.getValue().value).append("\n"); } - if (endpointState.applicationState.containsKey(ApplicationState.TOKENS)) + VersionedValue tokens = endpointState.getApplicationState(ApplicationState.TOKENS); + if (tokens != null) { - sb.append(" TOKENS:").append(endpointState.applicationState.get(ApplicationState.TOKENS).version).append(":\n"); + sb.append(" TOKENS:").append(tokens.version).append(":\n"); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bad57fc/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 05ba8c3..e9ebb6f 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -224,7 +224,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return true; try { - if (entry.getValue().getApplicationStateMap().containsKey(ApplicationState.INTERNAL_IP) && seeds.contains(InetAddress.getByName(entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP).value))) + VersionedValue internalIp = entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP); + if (internalIp != null && seeds.contains(InetAddress.getByName(internalIp.value))) return true; } catch (UnknownHostException e) @@ -371,8 +372,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean int getMaxEndpointStateVersion(EndpointState epState) { int maxVersion = epState.getHeartBeatState().getHeartBeatVersion(); - for (VersionedValue value : epState.getApplicationStateMap().values()) - maxVersion = Math.max(maxVersion, value.version); + for (Map.Entry state : epState.states()) + maxVersion = Math.max(maxVersion, state.getValue().version); return maxVersion; } @@ -525,8 +526,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean logger.info("Advertising removal for {}", endpoint); epState.updateTimestamp(); // make sure we don't evict it too soon epState.getHeartBeatState().forceNewerGenerationUnsafe(); - epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId)); - epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId)); + Map states = new EnumMap<>(ApplicationState.class); + states.put(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId)); + states.put(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId)); + epState.addApplicationStates(states); endpointStateMap.put(endpoint, epState); } @@ -867,7 +870,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean logger.trace("local heartbeat version " + localHbVersion + " greater than " + version + " for " + forEndpoint); } /* Accumulate all application states whose versions are greater than "version" variable */ - for (Entry entry : epState.getApplicationStateMap().entrySet()) + Map states = new EnumMap<>(ApplicationState.class); + for (Entry entry : epState.states()) { VersionedValue value = entry.getValue(); if (value.version > version) @@ -879,9 +883,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean final ApplicationState key = entry.getKey(); if (logger.isTraceEnabled()) logger.trace("Adding state " + key + ": " + value.value); - reqdEndpointState.addApplicationState(key, value); + + states.put(key, value); } } + reqdEndpointState.addApplicationStates(states); } return reqdEndpointState; } @@ -1153,19 +1159,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean localState.setHeartBeatState(remoteState.getHeartBeatState()); if (logger.isTraceEnabled()) logger.trace("Updating heartbeat state version to " + localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + " for " + addr + " ..."); - // we need to make two loops here, one to apply, then another to notify, this way all states in an update are present and current when the notifications are received - for (Entry remoteEntry : remoteState.getApplicationStateMap().entrySet()) - { - ApplicationState remoteKey = remoteEntry.getKey(); - VersionedValue remoteValue = remoteEntry.getValue(); - assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration(); - localState.addApplicationState(remoteKey, remoteValue); - } - for (Entry remoteEntry : remoteState.getApplicationStateMap().entrySet()) - { + Set> remoteStates = remoteState.states(); + assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration(); + localState.addApplicationStates(remoteStates); + + for (Entry remoteEntry : remoteStates) doOnChangeNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue()); - } } // notify that a local application state is going to change (doesn't get triggered for remote changes) @@ -1279,7 +1279,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public void start(int generationNumber) { - start(generationNumber, new HashMap()); + start(generationNumber, new EnumMap(ApplicationState.class)); } /** @@ -1291,8 +1291,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean /* initialize the heartbeat state for this localEndpoint */ maybeInitializeLocalState(generationNbr); EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress()); - for (Map.Entry entry : preloadLocalStates.entrySet()) - localState.addApplicationState(entry.getKey(), entry.getValue()); + localState.addApplicationStates(preloadLocalStates); //notify snitches that Gossiper is about to start DatabaseDescriptor.getEndpointSnitch().gossiperStarting(); @@ -1481,8 +1480,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean EndpointState localState = oldState == null ? newState : oldState; // always add the version state - localState.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion()); - localState.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid)); + Map states = new EnumMap<>(ApplicationState.class); + states.put(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion()); + states.put(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid)); + localState.addApplicationStates(states); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bad57fc/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index 1d33642..810a668 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -105,6 +105,11 @@ public class VersionedValue implements Comparable return "Value(" + value + "," + version + ")"; } + public byte[] toBytes() + { + return value.getBytes(ISO_8859_1); + } + private static String versionString(String... args) { return StringUtils.join(args, VersionedValue.DELIMITER); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bad57fc/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index af3a00c..93b1b97 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -518,9 +519,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()); try { - if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null) + VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS); + if (tokensVersionedValue == null) throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace"); - Collection tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS)))); + Collection tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes()))); SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need @@ -767,7 +769,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { if (!joined) { - Map appStates = new HashMap<>(); + Map appStates = new EnumMap<>(ApplicationState.class); if (DatabaseDescriptor.isReplacing() && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))) throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node"); @@ -1506,8 +1508,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE handleStateBootstrap(endpoint); break; case VersionedValue.STATUS_NORMAL: + handleStateNormal(endpoint, VersionedValue.STATUS_NORMAL); + break; case VersionedValue.SHUTDOWN: - handleStateNormal(endpoint); + handleStateNormal(endpoint, VersionedValue.SHUTDOWN); break; case VersionedValue.REMOVING_TOKEN: case VersionedValue.REMOVED_TOKEN: @@ -1586,7 +1590,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void updatePeerInfo(InetAddress endpoint) { EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); - for (Map.Entry entry : epState.getApplicationStateMap().entrySet()) + for (Map.Entry entry : epState.states()) { switch (entry.getKey()) { @@ -1619,17 +1623,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } - private byte[] getApplicationStateValue(InetAddress endpoint, ApplicationState appstate) - { - String vvalue = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(appstate).value; - return vvalue.getBytes(ISO_8859_1); - } - private Collection getTokensFor(InetAddress endpoint) { try { - return TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(endpoint, ApplicationState.TOKENS)))); + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (state == null) + return Collections.emptyList(); + + VersionedValue versionedValue = state.getApplicationState(ApplicationState.TOKENS); + if (versionedValue == null) + return Collections.emptyList(); + + return TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(versionedValue.toBytes()))); } catch (IOException e) { @@ -1679,22 +1685,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * * @param endpoint node */ - private void handleStateNormal(final InetAddress endpoint) + private void handleStateNormal(final InetAddress endpoint, final String status) { - Collection tokens; - - tokens = getTokensFor(endpoint); - + Collection tokens = getTokensFor(endpoint); Set tokensToUpdateInMetadata = new HashSet<>(); Set tokensToUpdateInSystemKeyspace = new HashSet<>(); Set endpointsToRemove = new HashSet<>(); - if (logger.isDebugEnabled()) - logger.debug("Node {} state normal, token {}", endpoint, tokens); + logger.debug("Node {} state {}, token {}", endpoint, status, tokens); if (tokenMetadata.isMember(endpoint)) - logger.info("Node {} state jump to normal", endpoint); + logger.info("Node {} state jump to {}", endpoint, status); + + if (tokens.isEmpty() && status.equals(VersionedValue.STATUS_NORMAL)) + logger.error("Node {} is in state normal but it has no tokens, state: {}", + endpoint, + Gossiper.instance.getEndpointStateForEndpoint(endpoint)); updatePeerInfo(endpoint); // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300). @@ -1820,8 +1827,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ private void handleStateLeaving(InetAddress endpoint) { - Collection tokens; - tokens = getTokensFor(endpoint); + Collection tokens = getTokensFor(endpoint); if (logger.isDebugEnabled()) logger.debug("Node {} state leaving, tokens {}", endpoint, tokens); @@ -1855,16 +1861,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void handleStateLeft(InetAddress endpoint, String[] pieces) { assert pieces.length >= 2; - Collection tokens = null; - try - { - tokens = getTokensFor(endpoint); - } - catch (Throwable th) - { - JVMStabilityInspector.inspectThrowable(th); - logger.warn("Unable to calculate tokens for {}.", endpoint); - } + Collection tokens = getTokensFor(endpoint); if (logger.isDebugEnabled()) logger.debug("Node {} state left, tokens {}", endpoint, tokens); @@ -1953,7 +1950,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint); removeEndpoint(endpoint); tokenMetadata.removeEndpoint(endpoint); - if (tokens != null) + if (!tokens.isEmpty()) tokenMetadata.removeBootstrapTokens(tokens); if (!isClientMode) @@ -2162,7 +2159,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void onJoin(InetAddress endpoint, EndpointState epState) { - for (Map.Entry entry : epState.getApplicationStateMap().entrySet()) + for (Map.Entry entry : epState.states()) { onChange(endpoint, entry.getKey(), entry.getValue()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bad57fc/test/unit/org/apache/cassandra/gms/EndpointStateTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/gms/EndpointStateTest.java b/test/unit/org/apache/cassandra/gms/EndpointStateTest.java new file mode 100644 index 0000000..b06c435 --- /dev/null +++ b/test/unit/org/apache/cassandra/gms/EndpointStateTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import java.util.Collections; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Token; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class EndpointStateTest +{ + public volatile VersionedValue.VersionedValueFactory valueFactory = + new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner()); + + @Test + public void testMultiThreadedReadConsistency() throws InterruptedException + { + for (int i = 0; i < 500; i++) + innerTestMultiThreadedReadConsistency(); + } + + /** + * Test that a thread reading values whilst they are updated by another thread will + * not see an entry unless it sees the entry previously added as well, even though + * we are accessing the map via an iterator backed by the underlying map. This + * works because EndpointState copies the map each time values are added. + */ + private void innerTestMultiThreadedReadConsistency() throws InterruptedException + { + final Token token = DatabaseDescriptor.getPartitioner().getRandomToken(); + final List tokens = Collections.singletonList(token); + final HeartBeatState hb = new HeartBeatState(0); + final EndpointState state = new EndpointState(hb); + final AtomicInteger numFailures = new AtomicInteger(); + + Thread t1 = new Thread(new Runnable() + { + public void run() + { + state.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens)); + state.addApplicationState(ApplicationState.STATUS, valueFactory.normal(tokens)); + } + }); + + Thread t2 = new Thread(new Runnable() + { + public void run() + { + for (int i = 0; i < 50; i++) + { + Map values = new EnumMap<>(ApplicationState.class); + for (Map.Entry entry : state.states()) + values.put(entry.getKey(), entry.getValue()); + + if (values.containsKey(ApplicationState.STATUS) && !values.containsKey(ApplicationState.TOKENS)) + { + numFailures.incrementAndGet(); + System.out.println(String.format("Failed: %s", values)); + } + } + } + }); + + t1.start(); + t2.start(); + + t1.join(); + t2.join(); + + assertTrue(numFailures.get() == 0); + } + + @Test + public void testMultiThreadWriteConsistency() throws InterruptedException + { + for (int i = 0; i < 500; i++) + innerTestMultiThreadWriteConsistency(); + } + + /** + * Test that two threads can update the state map concurrently. + */ + private void innerTestMultiThreadWriteConsistency() throws InterruptedException + { + final Token token = DatabaseDescriptor.getPartitioner().getRandomToken(); + final List tokens = Collections.singletonList(token); + final String ip = "127.0.0.1"; + final UUID hostId = UUID.randomUUID(); + final HeartBeatState hb = new HeartBeatState(0); + final EndpointState state = new EndpointState(hb); + + Thread t1 = new Thread(new Runnable() + { + public void run() + { + Map states = new EnumMap<>(ApplicationState.class); + states.put(ApplicationState.TOKENS, valueFactory.tokens(tokens)); + states.put(ApplicationState.STATUS, valueFactory.normal(tokens)); + state.addApplicationStates(states); + } + }); + + Thread t2 = new Thread(new Runnable() + { + public void run() + { + Map states = new EnumMap<>(ApplicationState.class); + states.put(ApplicationState.INTERNAL_IP, valueFactory.internalIP(ip)); + states.put(ApplicationState.HOST_ID, valueFactory.hostId(hostId)); + state.addApplicationStates(states); + } + }); + + t1.start(); + t2.start(); + + t1.join(); + t2.join(); + + Set> states = state.states(); + assertEquals(4, states.size()); + + Map values = new EnumMap<>(ApplicationState.class); + for (Map.Entry entry : states) + values.put(entry.getKey(), entry.getValue()); + + assertTrue(values.containsKey(ApplicationState.STATUS)); + assertTrue(values.containsKey(ApplicationState.TOKENS)); + assertTrue(values.containsKey(ApplicationState.INTERNAL_IP)); + assertTrue(values.containsKey(ApplicationState.HOST_ID)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bad57fc/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java b/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java index 714520e..90e63e0 100644 --- a/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java @@ -19,11 +19,10 @@ package org.apache.cassandra.locator; import java.io.IOException; import java.net.InetAddress; -import java.net.UnknownHostException; +import java.util.EnumMap; import java.util.Map; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -32,8 +31,6 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.net.OutboundTcpConnectionPool; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.db.Keyspace; @@ -81,9 +78,10 @@ public class CloudstackSnitchTest InetAddress nonlocal = InetAddress.getByName("127.0.0.7"); Gossiper.instance.addSavedEndpoint(nonlocal); - Map stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap(); + Map stateMap = new EnumMap<>(ApplicationState.class); stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("ch-zrh")); stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.rack("2")); + Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap); assertEquals("ch-zrh", snitch.getDatacenter(nonlocal)); assertEquals("2", snitch.getRack(nonlocal)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bad57fc/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java index 6015adf..56bbb77 100644 --- a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java @@ -24,6 +24,7 @@ package org.apache.cassandra.locator; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.EnumMap; import java.util.Map; import org.junit.AfterClass; @@ -79,9 +80,10 @@ public class EC2SnitchTest InetAddress nonlocal = InetAddress.getByName("127.0.0.7"); Gossiper.instance.addSavedEndpoint(nonlocal); - Map stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap(); + Map stateMap = new EnumMap<>(ApplicationState.class); stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("us-west")); stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.datacenter("1a")); + Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap); assertEquals("us-west", snitch.getDatacenter(nonlocal)); assertEquals("1a", snitch.getRack(nonlocal)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bad57fc/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java b/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java index 70080a8..1521454 100644 --- a/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java @@ -23,11 +23,10 @@ package org.apache.cassandra.locator; import java.io.IOException; import java.net.InetAddress; -import java.net.UnknownHostException; +import java.util.EnumMap; import java.util.Map; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -37,8 +36,6 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.net.OutboundTcpConnectionPool; import org.apache.cassandra.service.StorageService; import static org.junit.Assert.assertEquals; @@ -79,9 +76,10 @@ public class GoogleCloudSnitchTest InetAddress nonlocal = InetAddress.getByName("127.0.0.7"); Gossiper.instance.addSavedEndpoint(nonlocal); - Map stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap(); + Map stateMap = new EnumMap<>(ApplicationState.class); stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("europe-west1")); stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.datacenter("a")); + Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap); assertEquals("europe-west1", snitch.getDatacenter(nonlocal)); assertEquals("a", snitch.getRack(nonlocal));