cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r833579 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/dht/ test/unit/org/a...
Date Fri, 06 Nov 2009 22:17:27 GMT
Author: jbellis
Date: Fri Nov  6 22:17:26 2009
New Revision: 833579

URL: http://svn.apache.org/viewvc?rev=833579&view=rev
Log:
containing isBootstrapping and BootstrapTokenAddress information in TokenMetadata is the wrong
level of abstraction.  Switch to pendingRanges instead.
patch by jbellis and Jaakko Laine for CASSANDRA-525

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=833579&r1=833578&r2=833579&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Fri Nov
 6 22:17:26 2009
@@ -36,7 +36,6 @@
  import org.apache.cassandra.net.io.StreamContextManager;
  import org.apache.cassandra.net.io.IStreamComplete;
  import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.service.StorageLoadBalancer;
  import org.apache.cassandra.service.StreamManager;
  import org.apache.cassandra.utils.LogUtil;
  import org.apache.cassandra.utils.SimpleCondition;
@@ -50,7 +49,6 @@
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Table;
  import com.google.common.collect.Multimap;
- import com.google.common.collect.HashMultimap;
  import com.google.common.collect.ArrayListMultimap;
 
 
@@ -146,8 +144,8 @@
         {
             public int compare(InetAddress ia1, InetAddress ia2)
             {
-                int n1 = metadata.bootstrapTargets(ia1);
-                int n2 = metadata.bootstrapTargets(ia2);
+                int n1 = metadata.pendingRangeChanges(ia1);
+                int n2 = metadata.pendingRangeChanges(ia2);
                 if (n1 != n2)
                     return -(n1 - n2); // more targets = _less_ priority!
 
@@ -167,10 +165,8 @@
     /** get potential sources for each range, ordered by proximity (as determined by EndPointSnitch)
*/
     Multimap<Range, InetAddress> getRangesWithSources()
     {
-        TokenMetadata temp = tokenMetadata.cloneMe();
-        assert temp.sortedTokens().size() > 0;
-        temp.update(token, address);
-        Collection<Range> myRanges = replicationStrategy.getAddressRanges(temp).get(address);
+        assert tokenMetadata.sortedTokens().size() > 0;
+        Collection<Range> myRanges = replicationStrategy.getPendingAddressRanges(tokenMetadata,
token, address);
 
         Multimap<Range, InetAddress> myRangeAddresses = ArrayListMultimap.create();
         Multimap<Range, InetAddress> rangeAddresses = replicationStrategy.getRangeAddresses(tokenMetadata);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=833579&r1=833578&r2=833579&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Fri Nov  6 22:17:26
2009
@@ -792,6 +792,12 @@
         }
     }
 
+    public ApplicationState getApplicationState(InetAddress endpoint, String stateName)
+    {
+        assert endPointStateMap_.containsKey(endpoint);
+        return endPointStateMap_.get(endpoint).getApplicationState(stateName);
+    }
+
     /**
      * Start the gossiper with the generation # retrieved from the System
      * table

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=833579&r1=833578&r2=833579&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Fri Nov  6 22:17:26 2009
@@ -30,7 +30,6 @@
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.service.IResponseResolver;
-import org.apache.cassandra.service.InvalidRequestException;
 import org.apache.cassandra.service.QuorumResponseHandler;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -86,22 +85,18 @@
      *
      * Only ReplicationStrategy should care about this method (higher level users should
only ask for Hinted).
      */
-    public ArrayList<InetAddress> getWriteEndpoints(Token token, Collection<InetAddress>
naturalEndpoints)
+    public Collection<InetAddress> getWriteEndpoints(Token token, Collection<InetAddress>
naturalEndpoints)
     {
-        ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(naturalEndpoints);
+        if (tokenMetadata_.getPendingRanges().isEmpty())
+            return naturalEndpoints;
 
-        for (Token t : tokenMetadata_.bootstrapTokens())
+        List<InetAddress> endpoints = new ArrayList<InetAddress>(naturalEndpoints);
+
+        for (Map.Entry<Range, InetAddress> entry : tokenMetadata_.getPendingRanges().entrySet())
         {
-            TokenMetadata temp = tokenMetadata_.cloneMe();
-            InetAddress ep = tokenMetadata_.getBootstrapEndpoint(t);
-            temp.update(t, ep);
-            for (Range r : getAddressRanges(temp).get(ep))
+            if (entry.getKey().contains(token))
             {
-                if (r.contains(token))
-                {
-                    endpoints.add(ep);
-                    break;
-                }
+                endpoints.add(entry.getValue());
             }
         }
 
@@ -164,8 +159,11 @@
         return map;
     }
 
-    // TODO this is pretty inefficient. also the inverse (getRangeAddresses) below.
-    // fixing this probably requires merging tokenmetadata into replicationstrategy, so we
can cache/invalidate cleanly
+    /*
+     NOTE: this is pretty inefficient. also the inverse (getRangeAddresses) below.
+     this is fine as long as we don't use this on any critical path.
+     (fixing this would probably require merging tokenmetadata into replicationstrategy,
so we could cache/invalidate cleanly.)
+     */
     public Multimap<InetAddress, Range> getAddressRanges(TokenMetadata metadata)
     {
         Multimap<InetAddress, Range> map = HashMultimap.create();
@@ -202,4 +200,11 @@
     {
         return getAddressRanges(tokenMetadata_);
     }
+
+    public Collection<Range> getPendingAddressRanges(TokenMetadata metadata, Token
pendingToken, InetAddress pendingAddress)
+    {
+        TokenMetadata temp = metadata.cloneWithoutPending();
+        temp.update(pendingToken, pendingAddress);
+        return getAddressRanges(temp).get(pendingAddress);
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=833579&r1=833578&r2=833579&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Fri
Nov  6 22:17:26 2009
@@ -31,8 +31,10 @@
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.service.UnavailableException;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 
@@ -40,28 +42,23 @@
 {
     /* Maintains token to endpoint map of every node in the cluster. */
     private BiMap<Token, InetAddress> tokenToEndPointMap;
-    /* Bootstrapping nodes and their tokens */
-    private Set<InetAddress> bootstrapping;
-    private BiMap<Token, InetAddress> bootstrapTokenMap;
-    
+    private Map<Range, InetAddress> pendingRanges;
+
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
     private List<Token> sortedTokens;
 
     public TokenMetadata()
     {
-        this(null, null);
+        this(null);
     }
 
-    public TokenMetadata(BiMap<Token, InetAddress> tokenToEndPointMap, BiMap<Token,
InetAddress> bootstrapTokenMap)
+    public TokenMetadata(BiMap<Token, InetAddress> tokenToEndPointMap)
     {
-        bootstrapping = new NonBlockingHashSet<InetAddress>();
         if (tokenToEndPointMap == null)
             tokenToEndPointMap = HashBiMap.create();
-        if (bootstrapTokenMap == null)
-            bootstrapTokenMap = HashBiMap.create();
         this.tokenToEndPointMap = tokenToEndPointMap;
-        this.bootstrapTokenMap = bootstrapTokenMap;
+        pendingRanges = new NonBlockingHashMap<Range, InetAddress>();
         sortedTokens = sortTokens();
     }
 
@@ -72,47 +69,15 @@
         return Collections.unmodifiableList(tokens);
     }
 
-    public TokenMetadata(BiMap<Token, InetAddress> tokenEndpointMap)
-    {
-        this(tokenEndpointMap, null);
-    }
-
-    public void setBootstrapping(InetAddress endpoint, boolean isBootstrapping)
-    {
-        if (isBootstrapping)
-            bootstrapping.add(endpoint);
-        else
-            bootstrapping.remove(endpoint);
-
-        lock.writeLock().lock();
-        try
-        {
-            BiMap<Token, InetAddress> otherMap = bootstrapping.contains(endpoint) ?
tokenToEndPointMap : bootstrapTokenMap;
-            Token t = otherMap.inverse().get(endpoint);
-            if (t != null)
-            {
-                Map<Token, InetAddress> map = bootstrapping.contains(endpoint) ? bootstrapTokenMap
: tokenToEndPointMap;
-                map.put(t, endpoint);
-                sortedTokens = sortTokens();
-            }
-        }
-        finally
-        {
-            lock.writeLock().unlock();
-        }
-    }
-
     /** @return the number of nodes bootstrapping into source's primary range */
-    public int bootstrapTargets(InetAddress source)
+    public int pendingRangeChanges(InetAddress source)
     {
         int n = 0;
         Range sourceRange = getPrimaryRangeFor(getToken(source));
-        for (Token token : bootstrapTokenMap.keySet())
+        for (Map.Entry<Range, InetAddress> entry : pendingRanges.entrySet())
         {
-            if (sourceRange.contains(token))
-            {
+            if (sourceRange.contains(entry.getKey().right()) || entry.getValue().equals(source))
                 n++;
-            }
         }
         return n;
     }
@@ -128,11 +93,10 @@
         lock.writeLock().lock();
         try
         {
-            Map<Token, InetAddress> map = bootstrapping.contains(endpoint) ? bootstrapTokenMap
: tokenToEndPointMap;
-            Map<Token, InetAddress> otherMap = bootstrapping.contains(endpoint) ? tokenToEndPointMap
: bootstrapTokenMap;
-            map.put(token, endpoint);
-            otherMap.remove(token);
-            sortedTokens = sortTokens();
+            if (!endpoint.equals(tokenToEndPointMap.put(token, endpoint)))
+            {
+                sortedTokens = sortTokens();
+            }
         }
         finally
         {
@@ -188,12 +152,12 @@
         }
     }
 
-    public TokenMetadata cloneMe()
+    public TokenMetadata cloneWithoutPending()
     {
         lock.readLock().lock();
         try
         {
-            return new TokenMetadata(HashBiMap.create(tokenToEndPointMap), HashBiMap.create(bootstrapTokenMap));
+            return new TokenMetadata(HashBiMap.create(tokenToEndPointMap));
         }
         finally
         {
@@ -241,7 +205,7 @@
     public void clearUnsafe()
     {
         tokenToEndPointMap.clear();
-        bootstrapTokenMap.clear();
+        pendingRanges.clear();
     }
 
     public Range getPrimaryRangeFor(Token right)
@@ -262,6 +226,33 @@
         }
     }
 
+    public void addPendingRange(Range range, InetAddress endpoint)
+    {
+        InetAddress oldEndpoint = pendingRanges.get(range);
+        if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
+            throw new RuntimeException("pending range collision between " + oldEndpoint +
" and " + endpoint);
+        pendingRanges.put(range, endpoint);
+    }
+
+    public void removePendingRanges(InetAddress endpoint)
+    {
+        Iterator<Map.Entry<Range, InetAddress>> iter = pendingRanges.entrySet().iterator();
+        while (iter.hasNext())
+        {
+            Map.Entry<Range, InetAddress> entry = iter.next();
+            if (entry.getValue().equals(endpoint))
+            {
+                iter.remove();
+            }
+        }
+    }
+
+    /** a mutable map may be returned but caller should not modify it */
+    public Map<Range, InetAddress> getPendingRanges()
+    {
+        return pendingRanges;
+    }
+
     public Token getPredecessor(Token token)
     {
         List tokens = sortedTokens();
@@ -282,14 +273,4 @@
     {
         return getEndPoint(getSuccessor(getToken(endPoint)));
     }
-
-    public Iterable<? extends Token> bootstrapTokens()
-    {
-        return bootstrapTokenMap.keySet();
-    }
-
-    public InetAddress getBootstrapEndpoint(Token token)
-    {
-        return bootstrapTokenMap.get(token);
-    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=833579&r1=833578&r2=833579&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Fri Nov  6 22:17:26 2009
@@ -224,7 +224,14 @@
         */
     }
 
-    public void onJoin(InetAddress endpoint, EndPointState epState) {}
+    public void onJoin(InetAddress endpoint, EndPointState epState)
+    {
+        ApplicationState loadState = epState.getApplicationState(LoadDisseminator.loadInfo_);
+        if (loadState != null)
+        {
+            onChange(endpoint, LoadDisseminator.loadInfo_, loadState);
+        }
+    }
 
     public void onAlive(InetAddress endpoint, EndPointState state) {}
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=833579&r1=833578&r2=833579&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri
Nov  6 22:17:26 2009
@@ -53,10 +53,9 @@
 {
     private static Logger logger_ = Logger.getLogger(StorageService.class);     
 
-    private final static String NODE_ID = "NODE-ID";
-    public final static String MODE = "MODE";
-    public final static String MODE_MOVING = "move";
-    public final static String MODE_NORMAL = "run";
+    // these aren't in an enum since other gossip users can create states ad-hoc too (e.g.
load broadcasting)
+    public final static String STATE_NORMAL = "NORMAL";
+    public final static String STATE_BOOTSTRAPPING = "BOOTSTRAPPING";
 
     /* All stage identifiers */
     public final static String mutationStage_ = "ROW-MUTATION-STAGE";
@@ -162,8 +161,9 @@
 
         if (bootstrapSet.isEmpty())
         {
+            isBootstrapMode = false;
             SystemTable.setBootstrapped();
-            Gossiper.instance().addApplicationState(MODE, new ApplicationState(MODE_NORMAL));
+            Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL, new ApplicationState(partitioner_.getTokenFactory().toString(getLocalToken())));
             logger_.info("Bootstrap completed! Now serving reads.");
         }
     }
@@ -183,22 +183,15 @@
         tokenMetadata_.update(token, endpoint);
     }
 
-    /** This method updates the local token on disk and starts broacasting it to others.
*/
+    /** This method updates the local token on disk  */
     public void setToken(Token token)
     {
+        if (logger_.isDebugEnabled())
+            logger_.debug("Setting token to " + token);
         SystemTable.updateToken(token);
         tokenMetadata_.update(token, FBUtilities.getLocalAddress());
     }
 
-    public void setAndBroadcastToken(Token token)
-    {
-        if (logger_.isDebugEnabled())
-            logger_.debug("Setting token to " + token + " and gossiping it");
-        setToken(token);
-        ApplicationState state = new ApplicationState(partitioner_.getTokenFactory().toString(token));
-        Gossiper.instance().addApplicationState(StorageService.NODE_ID, state);
-    }
-
     public StorageService()
     {
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -270,18 +263,20 @@
 
         if (isBootstrapMode)
         {
-            Gossiper.instance().addApplicationState(MODE, new ApplicationState(MODE_MOVING));
             logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
             StorageLoadBalancer.instance().waitForLoadInfo();
             logger_.info("... got load info");
             setToken(BootStrapper.getBootstrapToken(tokenMetadata_, StorageLoadBalancer.instance().getLoadInfo()));
+            Gossiper.instance().addApplicationState(StorageService.STATE_BOOTSTRAPPING, new
ApplicationState(partitioner_.getTokenFactory().toString(getLocalToken())));
             new BootStrapper(replicationStrategy_, FBUtilities.getLocalAddress(), getLocalToken(),
tokenMetadata_).startBootstrap(); // handles token update
         }
         else
         {
             SystemTable.setBootstrapped();
+            Token token = storageMetadata_.getToken();
+            setToken(token);
+            Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL, new ApplicationState(partitioner_.getTokenFactory().toString(token)));
         }
-        setAndBroadcastToken(storageMetadata_.getToken());
 
         assert tokenMetadata_.sortedTokens().size() > 0;
     }
@@ -350,55 +345,59 @@
         return rangeToEndPointMap;
     }
 
-    /**
-     *  Called when there is a change in application state. In particular
-     *  we are interested in new tokens as a result of a new node or an
-     *  existing node moving to a new location on the ring.
-    */
+    /*
+     * onChange only ever sees one ApplicationState piece change at a time, so we perform
a kind of state machine here.
+     * We are concerned with two events: knowing the token associated with an enpoint, and
knowing its operation mode.
+     * Nodes can start in either bootstrap or normal mode, and from bootstrap mode can change
mode to normal.
+     * A node in bootstrap mode needs to have pendingranges set in TokenMetadata; a node
in normal mode
+     * should instead be part of the token ring.
+     */
     public void onChange(InetAddress endpoint, String stateName, ApplicationState state)
     {
-        if (StorageService.MODE.equals(stateName))
+        Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
+
+        if (STATE_BOOTSTRAPPING.equals(stateName))
         {
-            String mode = state.getValue();
             if (logger_.isDebugEnabled())
-                logger_.debug(endpoint + " is in " + mode + " mode");
-            boolean bootstrapState = mode.equals(MODE_MOVING);
-            tokenMetadata_.setBootstrapping(endpoint,  bootstrapState);
+                logger_.debug(endpoint + " state bootstrapping, token " + token);
+            updateBootstrapRanges(token, endpoint);
         }
-
-        if (StorageService.NODE_ID.equals(stateName))
+        else if (STATE_NORMAL.equals(stateName))
         {
-            Token newToken = getPartitioner().getTokenFactory().fromString(state.getValue());
             if (logger_.isDebugEnabled())
-              logger_.debug("CHANGE IN STATE FOR " + endpoint + " - has token " + newToken);
+                logger_.debug(endpoint + " state normal, token " + token);
+            tokenMetadata_.removePendingRanges(endpoint);
+            updateForeignToken(token, endpoint);
+        }
+    }
 
-            if (tokenMetadata_.isMember(endpoint))
-            {
-                Token oldToken = tokenMetadata_.getToken(endpoint);
+    private void updateBootstrapRanges(Token token, InetAddress endpoint)
+    {
+        for (Range range : replicationStrategy_.getPendingAddressRanges(tokenMetadata_, token,
endpoint))
+        {
+            tokenMetadata_.addPendingRange(range, endpoint);
+        }
+    }
 
-                /*
-                 * If oldToken is not equal to
-                 * the newToken this means that the node is being relocated
-                 * to another position in the ring.
-                */
-                if (!oldToken.equals(newToken))
-                {
-                    if (logger_.isDebugEnabled())
-                        logger_.debug("Relocation for endpoint " + endpoint);
-                    updateForeignToken(newToken, endpoint);
-                }
-            }
-            else
-            {
-                /*
-                 * This is a new node and we just update the token map.
-                */
-                updateForeignToken(newToken, endpoint);
-            }
+    public static void updateBootstrapRanges(AbstractReplicationStrategy strategy, TokenMetadata
metadata, Token token, InetAddress endpoint)
+    {
+        for (Range range : strategy.getPendingAddressRanges(metadata, token, endpoint))
+        {
+            metadata.addPendingRange(range, endpoint);
         }
     }
 
-    public void onJoin(InetAddress endpoint, EndPointState epState) {}
+    public void onJoin(InetAddress endpoint, EndPointState epState)
+    {
+        ApplicationState stateNormal = epState.getApplicationState(StorageService.STATE_NORMAL);
+        ApplicationState stateBootstrapping = epState.getApplicationState(StorageService.STATE_BOOTSTRAPPING);
+
+        if (stateNormal != null)
+            onChange(endpoint, StorageService.STATE_NORMAL, stateNormal);
+
+        if (stateBootstrapping != null)
+            onChange(endpoint, StorageService.STATE_BOOTSTRAPPING, stateBootstrapping);
+    }
 
     public void onAlive(InetAddress endpoint, EndPointState state)
     {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java?rev=833579&r1=833578&r2=833579&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
Fri Nov  6 22:17:26 2009
@@ -26,6 +26,8 @@
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.ApplicationState;
 
 public class TokenUpdateVerbHandler implements IVerbHandler
 {
@@ -33,13 +35,17 @@
 
     public void doVerb(Message message)
     {
+        if (StorageService.instance().isBootstrapMode())
+            throw new UnsupportedOperationException("Cannot set token during bootstrap");
+
         byte[] body = message.getMessageBody();
         DataInputBuffer bufIn = new DataInputBuffer();
         bufIn.reset(body, body.length);
         try
         {
             Token token = Token.serializer().deserialize(bufIn);
-            StorageService.instance().setAndBroadcastToken(token);
+            StorageService.instance().setToken(token);
+            Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL, new ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(token)));
         }
         catch (IOException ex)
         {

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=833579&r1=833578&r2=833579&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Fri
Nov  6 22:17:26 2009
@@ -58,11 +58,10 @@
         assert three.equals(source);
 
         InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
-        tmd.setBootstrapping(myEndpoint, true);
         Range range3 = ss.getPrimaryRangeForEndPoint(three);
         Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left(),
range3.right());
         assert range3.contains(fakeToken);
-        tmd.update(fakeToken, myEndpoint);
+        StorageService.updateBootstrapRanges(StorageService.instance().getReplicationStrategy(),
tmd, fakeToken, myEndpoint);
 
         InetAddress source2 = BootStrapper.getBootstrapSource(tmd, load);
         assert two.equals(source2) : source2;

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=833579&r1=833578&r2=833579&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
(original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
Fri Nov  6 22:17:26 2009
@@ -23,6 +23,7 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Collection;
 
 import org.junit.Test;
 import org.apache.cassandra.dht.IPartitioner;
@@ -31,6 +32,8 @@
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.dht.OrderPreservingPartitioner;
 import org.apache.cassandra.dht.StringToken;
+import org.apache.cassandra.service.StorageService;
+
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
@@ -118,12 +121,11 @@
         //Add bootstrap node id=6
         Token bsToken = new BigIntegerToken(String.valueOf(25));
         InetAddress bootstrapEndPoint = InetAddress.getByName("127.0.0.6");
-        tmd.setBootstrapping(bootstrapEndPoint, true);
-        tmd.update(bsToken, bootstrapEndPoint);
+        StorageService.updateBootstrapRanges(strategy, tmd, bsToken, bootstrapEndPoint);
         
         for (int i = 0; i < keyTokens.length; i++)
         {
-            List<InetAddress> endPoints = strategy.getWriteEndpoints(keyTokens[i],
strategy.getNaturalEndpoints(keyTokens[i]));
+            Collection<InetAddress> endPoints = strategy.getWriteEndpoints(keyTokens[i],
strategy.getNaturalEndpoints(keyTokens[i]));
             assertTrue(endPoints.size() >= 3);
 
             for (int j = 0; j < 3; j++)



Mime
View raw message