cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmcken...@apache.org
Subject [17/22] cassandra git commit: 10089 - 3.0 patch
Date Wed, 11 Nov 2015 20:09:03 GMT
10089 - 3.0 patch


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a90e989
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a90e989
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a90e989

Branch: refs/heads/cassandra-3.1
Commit: 9a90e9894e9e079058876cf2b16a47d29ba0a32a
Parents: 30eecb2
Author: Stefania Alborghetti <stefania.alborghetti@datastax.com>
Authored: Wed Nov 11 15:05:35 2015 -0500
Committer: Joshua McKenzie <jmckenzie@apache.org>
Committed: Wed Nov 11 15:05:35 2015 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/gms/EndpointState.java |  76 ++++++---
 .../apache/cassandra/gms/FailureDetector.java   |   7 +-
 src/java/org/apache/cassandra/gms/Gossiper.java |  47 +++---
 .../apache/cassandra/gms/VersionedValue.java    |   5 +
 .../cassandra/service/StorageService.java       |  61 ++++---
 .../apache/cassandra/gms/EndpointStateTest.java | 159 +++++++++++++++++++
 .../cassandra/locator/CloudstackSnitchTest.java |   4 +-
 .../apache/cassandra/locator/EC2SnitchTest.java |   4 +-
 .../locator/GoogleCloudSnitchTest.java          |   4 +-
 9 files changed, 282 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index d1c023a..70f2a68 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -18,7 +18,11 @@
 package org.apache.cassandra.gms;
 
 import java.io.*;
+import java.util.Collections;
+import java.util.EnumMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,8 +30,6 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
 /**
  * This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState
  * instance. Any state for a given endpoint can be retrieved from this instance.
@@ -41,7 +43,7 @@ public class EndpointState
     public final static IVersionedSerializer<EndpointState> serializer = new EndpointStateSerializer();
 
     private volatile HeartBeatState hbState;
-    final Map<ApplicationState, VersionedValue> applicationState = new NonBlockingHashMap<ApplicationState,
VersionedValue>();
+    private final AtomicReference<Map<ApplicationState, VersionedValue>> applicationState;
 
     /* fields below do not get serialized */
     private volatile long updateTimestamp;
@@ -49,7 +51,13 @@ public class EndpointState
 
     EndpointState(HeartBeatState initialHbState)
     {
+        this(initialHbState, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class));
+    }
+
+    EndpointState(HeartBeatState initialHbState, Map<ApplicationState, VersionedValue>
states)
+    {
         hbState = initialHbState;
+        applicationState = new AtomicReference<Map<ApplicationState, VersionedValue>>(new
EnumMap<>(states));
         updateTimestamp = System.nanoTime();
         isAlive = true;
     }
@@ -67,21 +75,37 @@ public class EndpointState
 
     public VersionedValue getApplicationState(ApplicationState key)
     {
-        return applicationState.get(key);
+        return applicationState.get().get(key);
     }
 
-    /**
-     * TODO replace this with operations that don't expose private state
-     */
-    @Deprecated
-    public Map<ApplicationState, VersionedValue> getApplicationStateMap()
+    public Set<Map.Entry<ApplicationState, VersionedValue>> states()
+    {
+        return applicationState.get().entrySet();
+    }
+
+    public void addApplicationState(ApplicationState key, VersionedValue value)
     {
-        return applicationState;
+        addApplicationStates(Collections.singletonMap(key, value));
     }
 
-    void addApplicationState(ApplicationState key, VersionedValue value)
+    public void addApplicationStates(Map<ApplicationState, VersionedValue> values)
     {
-        applicationState.put(key, value);
+        addApplicationStates(values.entrySet());
+    }
+
+    public void addApplicationStates(Set<Map.Entry<ApplicationState, VersionedValue>>
values)
+    {
+        while (true)
+        {
+            Map<ApplicationState, VersionedValue> orig = applicationState.get();
+            Map<ApplicationState, VersionedValue> copy = new EnumMap<>(orig);
+
+            for (Map.Entry<ApplicationState, VersionedValue> value : values)
+                copy.put(value.getKey(), value.getValue());
+
+            if (applicationState.compareAndSet(orig, copy))
+                return;
+        }
     }
 
     /* getters and setters */
@@ -132,7 +156,7 @@ public class EndpointState
 
     public String toString()
     {
-        return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState;
+        return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get();
     }
 }
 
@@ -145,12 +169,12 @@ class EndpointStateSerializer implements IVersionedSerializer<EndpointState>
         HeartBeatState.serializer.serialize(hbState, out, version);
 
         /* serialize the map of ApplicationState objects */
-        int size = epState.applicationState.size();
-        out.writeInt(size);
-        for (Map.Entry<ApplicationState, VersionedValue> entry : epState.applicationState.entrySet())
+        Set<Map.Entry<ApplicationState, VersionedValue>> states = epState.states();
+        out.writeInt(states.size());
+        for (Map.Entry<ApplicationState, VersionedValue> state : states)
         {
-            VersionedValue value = entry.getValue();
-            out.writeInt(entry.getKey().ordinal());
+            VersionedValue value = state.getValue();
+            out.writeInt(state.getKey().ordinal());
             VersionedValue.serializer.serialize(value, out, version);
         }
     }
@@ -158,26 +182,28 @@ class EndpointStateSerializer implements IVersionedSerializer<EndpointState>
     public EndpointState deserialize(DataInputPlus in, int version) throws IOException
     {
         HeartBeatState hbState = HeartBeatState.serializer.deserialize(in, version);
-        EndpointState epState = new EndpointState(hbState);
 
         int appStateSize = in.readInt();
+        Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
         for (int i = 0; i < appStateSize; ++i)
         {
             int key = in.readInt();
             VersionedValue value = VersionedValue.serializer.deserialize(in, version);
-            epState.addApplicationState(Gossiper.STATES[key], value);
+            states.put(Gossiper.STATES[key], value);
         }
-        return epState;
+
+        return new EndpointState(hbState, states);
     }
 
     public long serializedSize(EndpointState epState, int version)
     {
         long size = HeartBeatState.serializer.serializedSize(epState.getHeartBeatState(),
version);
-        size += TypeSizes.sizeof(epState.applicationState.size());
-        for (Map.Entry<ApplicationState, VersionedValue> entry : epState.applicationState.entrySet())
+        Set<Map.Entry<ApplicationState, VersionedValue>> states = epState.states();
+        size += TypeSizes.sizeof(states.size());
+        for (Map.Entry<ApplicationState, VersionedValue> state : states)
         {
-            VersionedValue value = entry.getValue();
-            size += TypeSizes.sizeof(entry.getKey().ordinal());
+            VersionedValue value = state.getValue();
+            size += TypeSizes.sizeof(state.getKey().ordinal());
             size += VersionedValue.serializer.serializedSize(value, version);
         }
         return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index c563872..a0754b1 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -192,15 +192,16 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
     {
         sb.append("  generation:").append(endpointState.getHeartBeatState().getGeneration()).append("\n");
         sb.append("  heartbeat:").append(endpointState.getHeartBeatState().getHeartBeatVersion()).append("\n");
-        for (Map.Entry<ApplicationState, VersionedValue> state : endpointState.applicationState.entrySet())
+        for (Map.Entry<ApplicationState, VersionedValue> state : endpointState.states())
         {
             if (state.getKey() == ApplicationState.TOKENS)
                 continue;
             sb.append("  ").append(state.getKey()).append(":").append(state.getValue().version).append(":").append(state.getValue().value).append("\n");
         }
-        if (endpointState.applicationState.containsKey(ApplicationState.TOKENS))
+        VersionedValue tokens = endpointState.getApplicationState(ApplicationState.TOKENS);
+        if (tokens != null)
         {
-            sb.append("  TOKENS:").append(endpointState.applicationState.get(ApplicationState.TOKENS).version).append(":<hidden>\n");
+            sb.append("  TOKENS:").append(tokens.version).append(":<hidden>\n");
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 99c6755..795a22f 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -224,7 +224,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                 return true;
             try
             {
-                if (entry.getValue().getApplicationStateMap().containsKey(ApplicationState.INTERNAL_IP)
&& seeds.contains(InetAddress.getByName(entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP).value)))
+                VersionedValue internalIp = entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP);
+                if (internalIp != null && seeds.contains(InetAddress.getByName(internalIp.value)))
                     return true;
             }
             catch (UnknownHostException e)
@@ -371,8 +372,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     int getMaxEndpointStateVersion(EndpointState epState)
     {
         int maxVersion = epState.getHeartBeatState().getHeartBeatVersion();
-        for (VersionedValue value : epState.getApplicationStateMap().values())
-            maxVersion = Math.max(maxVersion, value.version);
+        for (Map.Entry<ApplicationState, VersionedValue> state : epState.states())
+            maxVersion = Math.max(maxVersion, state.getValue().version);
         return maxVersion;
     }
 
@@ -525,8 +526,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         logger.info("Advertising removal for {}", endpoint);
         epState.updateTimestamp(); // make sure we don't evict it too soon
         epState.getHeartBeatState().forceNewerGenerationUnsafe();
-        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId));
-        epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId));
+        Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+        states.put(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId));
+        states.put(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId));
+        epState.addApplicationStates(states);
         endpointStateMap.put(endpoint, epState);
     }
 
@@ -867,7 +870,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                     logger.trace("local heartbeat version {} greater than {} for {}", localHbVersion,
version, forEndpoint);
             }
             /* Accumulate all application states whose versions are greater than "version"
variable */
-            for (Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet())
+            Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+            for (Entry<ApplicationState, VersionedValue> entry : epState.states())
             {
                 VersionedValue value = entry.getValue();
                 if (value.version > version)
@@ -879,9 +883,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                     final ApplicationState key = entry.getKey();
                     if (logger.isTraceEnabled())
                         logger.trace("Adding state {}: {}" , key, value.value);
-                    reqdEndpointState.addApplicationState(key, value);
+
+                    states.put(key, value);
                 }
             }
+            reqdEndpointState.addApplicationStates(states);
         }
         return reqdEndpointState;
     }
@@ -1161,19 +1167,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         localState.setHeartBeatState(remoteState.getHeartBeatState());
         if (logger.isTraceEnabled())
             logger.trace("Updating heartbeat state version to {} from {} for {} ...", localState.getHeartBeatState().getHeartBeatVersion(),
oldVersion, addr);
-        // we need to make two loops here, one to apply, then another to notify, this way
all states in an update are present and current when the notifications are received
-        for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet())
-        {
-            ApplicationState remoteKey = remoteEntry.getKey();
-            VersionedValue remoteValue = remoteEntry.getValue();
 
-            assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
-            localState.addApplicationState(remoteKey, remoteValue);
-        }
-        for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet())
-        {
+        Set<Entry<ApplicationState, VersionedValue>> remoteStates = remoteState.states();
+        assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
+        localState.addApplicationStates(remoteStates);
+
+        for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteStates)
             doOnChangeNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue());
-        }
     }
     
     // notify that a local application state is going to change (doesn't get triggered for
remote changes)
@@ -1287,7 +1287,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     public void start(int generationNumber)
     {
-        start(generationNumber, new HashMap<ApplicationState, VersionedValue>());
+        start(generationNumber, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class));
     }
 
     /**
@@ -1299,8 +1299,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         /* initialize the heartbeat state for this localEndpoint */
         maybeInitializeLocalState(generationNbr);
         EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
-        for (Map.Entry<ApplicationState, VersionedValue> entry : preloadLocalStates.entrySet())
-            localState.addApplicationState(entry.getKey(), entry.getValue());
+        localState.addApplicationStates(preloadLocalStates);
 
         //notify snitches that Gossiper is about to start
         DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
@@ -1489,8 +1488,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         EndpointState localState = oldState == null ? newState : oldState;
 
         // always add the version state
-        localState.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
-        localState.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid));
+        Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+        states.put(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
+        states.put(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid));
+        localState.addApplicationStates(states);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 25f7706..0366320 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -108,6 +108,11 @@ public class VersionedValue implements Comparable<VersionedValue>
         return "Value(" + value + "," + version + ")";
     }
 
+    public byte[] toBytes()
+    {
+        return value.getBytes(ISO_8859_1);
+    }
+
     private static String versionString(String... args)
     {
         return StringUtils.join(args, VersionedValue.DELIMITER);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 34df507..16dd045 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -504,12 +504,10 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
         try
         {
-            if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS)
== null)
+            VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+            if (tokensVersionedValue == null)
                 throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress()
+ " to replace");
-            Collection<Token> tokens = TokenSerializer.deserialize(
-                    tokenMetadata.partitioner,
-                    new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(),
-                                                                                        
 ApplicationState.TOKENS))));
+            Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner,
new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
 
             SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own
so we receive hints, etc
             Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we
need
@@ -746,7 +744,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
     {
         if (!joined)
         {
-            Map<ApplicationState, VersionedValue> appStates = new HashMap<>();
+            Map<ApplicationState, VersionedValue> appStates = new EnumMap<>(ApplicationState.class);
 
             if (SystemKeyspace.wasDecommissioned())
             {
@@ -1676,8 +1674,10 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
                     handleStateBootstrap(endpoint);
                     break;
                 case VersionedValue.STATUS_NORMAL:
+                    handleStateNormal(endpoint, VersionedValue.STATUS_NORMAL);
+                    break;
                 case VersionedValue.SHUTDOWN:
-                    handleStateNormal(endpoint);
+                    handleStateNormal(endpoint, VersionedValue.SHUTDOWN);
                     break;
                 case VersionedValue.REMOVING_TOKEN:
                 case VersionedValue.REMOVED_TOKEN:
@@ -1759,7 +1759,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
     private void updatePeerInfo(InetAddress endpoint)
     {
         EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-        for (Map.Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet())
+        for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states())
         {
             switch (entry.getKey())
             {
@@ -1792,12 +1792,6 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
         }
     }
 
-    private byte[] getApplicationStateValue(InetAddress endpoint, ApplicationState appstate)
-    {
-        String vvalue = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(appstate).value;
-        return vvalue.getBytes(ISO_8859_1);
-    }
-
     private void notifyRpcChange(InetAddress endpoint, boolean ready)
     {
         if (ready)
@@ -1867,9 +1861,15 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
     {
         try
         {
-            return TokenSerializer.deserialize(
-                    tokenMetadata.partitioner,
-                    new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(endpoint,
ApplicationState.TOKENS))));
+            EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+            if (state == null)
+                return Collections.emptyList();
+
+            VersionedValue versionedValue = state.getApplicationState(ApplicationState.TOKENS);
+            if (versionedValue == null)
+                return Collections.emptyList();
+
+            return TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new
ByteArrayInputStream(versionedValue.toBytes())));
         }
         catch (IOException e)
         {
@@ -1918,22 +1918,23 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
      *
      * @param endpoint node
      */
-    private void handleStateNormal(final InetAddress endpoint)
+    private void handleStateNormal(final InetAddress endpoint, final String status)
     {
-        Collection<Token> tokens;
-
-        tokens = getTokensFor(endpoint);
-
+        Collection<Token> tokens = getTokensFor(endpoint);
         Set<Token> tokensToUpdateInMetadata = new HashSet<>();
         Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
         Set<InetAddress> endpointsToRemove = new HashSet<>();
 
-
         if (logger.isDebugEnabled())
-            logger.debug("Node {} state normal, token {}", endpoint, tokens);
+            logger.debug("Node {} state {}, token {}", endpoint, status, tokens);
 
         if (tokenMetadata.isMember(endpoint))
-            logger.info("Node {} state jump to normal", endpoint);
+            logger.info("Node {} state jump to {}", endpoint, status);
+
+        if (tokens.isEmpty() && status.equals(VersionedValue.STATUS_NORMAL))
+            logger.error("Node {} is in state normal but it has no tokens, state: {}",
+                         endpoint,
+                         Gossiper.instance.getEndpointStateForEndpoint(endpoint));
 
         updatePeerInfo(endpoint);
         // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(),
(see CASSANDRA-4300).
@@ -2044,8 +2045,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
      */
     private void handleStateLeaving(InetAddress endpoint)
     {
-        Collection<Token> tokens;
-        tokens = getTokensFor(endpoint);
+        Collection<Token> tokens = getTokensFor(endpoint);
 
         if (logger.isDebugEnabled())
             logger.debug("Node {} state leaving, tokens {}", endpoint, tokens);
@@ -2079,8 +2079,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
     private void handleStateLeft(InetAddress endpoint, String[] pieces)
     {
         assert pieces.length >= 2;
-        Collection<Token> tokens;
-        tokens = getTokensFor(endpoint);
+        Collection<Token> tokens = getTokensFor(endpoint);
 
         if (logger.isDebugEnabled())
             logger.debug("Node {} state left, tokens {}", endpoint, tokens);
@@ -2172,7 +2171,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
 
         removeEndpoint(endpoint);
         tokenMetadata.removeEndpoint(endpoint);
-        if (tokens != null)
+        if (!tokens.isEmpty())
             tokenMetadata.removeBootstrapTokens(tokens);
         notifyLeft(endpoint);
         PendingRangeCalculatorService.instance.update();
@@ -2375,7 +2374,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
 
     public void onJoin(InetAddress endpoint, EndpointState epState)
     {
-        for (Map.Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet())
+        for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states())
         {
             onChange(endpoint, entry.getKey(), entry.getValue());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/test/unit/org/apache/cassandra/gms/EndpointStateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/EndpointStateTest.java b/test/unit/org/apache/cassandra/gms/EndpointStateTest.java
new file mode 100644
index 0000000..b06c435
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/EndpointStateTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Token;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class EndpointStateTest
+{
+    public volatile VersionedValue.VersionedValueFactory valueFactory =
+        new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
+
+    @Test
+    public void testMultiThreadedReadConsistency() throws InterruptedException
+    {
+        for (int i = 0; i < 500; i++)
+            innerTestMultiThreadedReadConsistency();
+    }
+
+    /**
+     * Test that a thread reading values whilst they are updated by another thread will
+     * not see an entry unless it sees the entry previously added as well, even though
+     * we are accessing the map via an iterator backed by the underlying map. This
+     * works because EndpointState copies the map each time values are added.
+     */
+    private void innerTestMultiThreadedReadConsistency() throws InterruptedException
+    {
+        final Token token = DatabaseDescriptor.getPartitioner().getRandomToken();
+        final List<Token> tokens = Collections.singletonList(token);
+        final HeartBeatState hb = new HeartBeatState(0);
+        final EndpointState state = new EndpointState(hb);
+        final AtomicInteger numFailures = new AtomicInteger();
+
+        Thread t1 = new Thread(new Runnable()
+        {
+            public void run()
+            {
+                state.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));
+                state.addApplicationState(ApplicationState.STATUS, valueFactory.normal(tokens));
+            }
+        });
+
+        Thread t2 = new Thread(new Runnable()
+        {
+            public void run()
+            {
+                for (int i = 0; i < 50; i++)
+                {
+                    Map<ApplicationState, VersionedValue> values = new EnumMap<>(ApplicationState.class);
+                    for (Map.Entry<ApplicationState, VersionedValue> entry : state.states())
+                        values.put(entry.getKey(), entry.getValue());
+
+                    if (values.containsKey(ApplicationState.STATUS) && !values.containsKey(ApplicationState.TOKENS))
+                    {
+                        numFailures.incrementAndGet();
+                        System.out.println(String.format("Failed: %s", values));
+                    }
+                }
+            }
+        });
+
+        t1.start();
+        t2.start();
+
+        t1.join();
+        t2.join();
+
+        assertTrue(numFailures.get() == 0);
+    }
+
+    @Test
+    public void testMultiThreadWriteConsistency() throws InterruptedException
+    {
+        for (int i = 0; i < 500; i++)
+            innerTestMultiThreadWriteConsistency();
+    }
+
+    /**
+     * Test that two threads can update the state map concurrently.
+     */
+    private void innerTestMultiThreadWriteConsistency() throws InterruptedException
+    {
+        final Token token = DatabaseDescriptor.getPartitioner().getRandomToken();
+        final List<Token> tokens = Collections.singletonList(token);
+        final String ip = "127.0.0.1";
+        final UUID hostId = UUID.randomUUID();
+        final HeartBeatState hb = new HeartBeatState(0);
+        final EndpointState state = new EndpointState(hb);
+
+        Thread t1 = new Thread(new Runnable()
+        {
+            public void run()
+            {
+                Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+                states.put(ApplicationState.TOKENS, valueFactory.tokens(tokens));
+                states.put(ApplicationState.STATUS, valueFactory.normal(tokens));
+                state.addApplicationStates(states);
+            }
+        });
+
+        Thread t2 = new Thread(new Runnable()
+        {
+            public void run()
+            {
+                Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+                states.put(ApplicationState.INTERNAL_IP, valueFactory.internalIP(ip));
+                states.put(ApplicationState.HOST_ID, valueFactory.hostId(hostId));
+                state.addApplicationStates(states);
+            }
+        });
+
+        t1.start();
+        t2.start();
+
+        t1.join();
+        t2.join();
+
+        Set<Map.Entry<ApplicationState, VersionedValue>> states = state.states();
+        assertEquals(4, states.size());
+
+        Map<ApplicationState, VersionedValue> values = new EnumMap<>(ApplicationState.class);
+        for (Map.Entry<ApplicationState, VersionedValue> entry : states)
+            values.put(entry.getKey(), entry.getValue());
+
+        assertTrue(values.containsKey(ApplicationState.STATUS));
+        assertTrue(values.containsKey(ApplicationState.TOKENS));
+        assertTrue(values.containsKey(ApplicationState.INTERNAL_IP));
+        assertTrue(values.containsKey(ApplicationState.HOST_ID));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java b/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
index 7881265..5ac1b31 100644
--- a/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.locator;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.EnumMap;
 import java.util.Map;
 
 import org.junit.AfterClass;
@@ -78,9 +79,10 @@ public class CloudstackSnitchTest
         InetAddress nonlocal = InetAddress.getByName("127.0.0.7");
 
         Gossiper.instance.addSavedEndpoint(nonlocal);
-        Map<ApplicationState,VersionedValue> stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap();
+        Map<ApplicationState, VersionedValue> stateMap = new EnumMap<>(ApplicationState.class);
         stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("ch-zrh"));
         stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.rack("2"));
+        Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap);
 
         assertEquals("ch-zrh", snitch.getDatacenter(nonlocal));
         assertEquals("2", snitch.getRack(nonlocal));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
index cb30dc0..ca6f359 100644
--- a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.locator;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.EnumMap;
 import java.util.Map;
 
 import org.junit.AfterClass;
@@ -77,9 +78,10 @@ public class EC2SnitchTest
         InetAddress nonlocal = InetAddress.getByName("127.0.0.7");
 
         Gossiper.instance.addSavedEndpoint(nonlocal);
-        Map<ApplicationState,VersionedValue> stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap();
+        Map<ApplicationState, VersionedValue> stateMap = new EnumMap<>(ApplicationState.class);
         stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("us-west"));
         stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.datacenter("1a"));
+        Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap);
 
         assertEquals("us-west", snitch.getDatacenter(nonlocal));
         assertEquals("1a", snitch.getRack(nonlocal));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java b/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
index fff880d..04b4361 100644
--- a/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.locator;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.EnumMap;
 import java.util.Map;
 
 import org.junit.AfterClass;
@@ -73,9 +74,10 @@ public class GoogleCloudSnitchTest
         InetAddress nonlocal = InetAddress.getByName("127.0.0.7");
 
         Gossiper.instance.addSavedEndpoint(nonlocal);
-        Map<ApplicationState,VersionedValue> stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap();
+        Map<ApplicationState, VersionedValue> stateMap = new EnumMap<>(ApplicationState.class);
         stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("europe-west1"));
         stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.datacenter("a"));
+        Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap);
 
         assertEquals("europe-west1", snitch.getDatacenter(nonlocal));
         assertEquals("a", snitch.getRack(nonlocal));


Mime
View raw message