cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r830212 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/locator/ src/java/org/apach...
Date Tue, 27 Oct 2009 14:40:19 GMT
Author: jbellis
Date: Tue Oct 27 14:40:18 2009
New Revision: 830212

URL: http://svn.apache.org/viewvc?rev=830212&view=rev
Log:
fix the bootstrap interaction with gossip; there were two main problems:
1) token and bootstrap state are not guaranteed to be gossiped together; since we only updated
TMD.bootstrapNodes on an update of the token, this means we could actually miss the bootstrap
notice
2) deletions of state are not actually supported by Gossiper; there is no concept of that
at the protocol level. so if we delete state locally it will never get gossiped. Instead,
we have a MODE that is either MOVING or NORMAL, corresponding to bootstrap & normal operation.

patch by jbellis; reviewed by goffinet for CASSANDRA-483

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java Tue Oct
27 14:40:18 2009
@@ -36,6 +36,8 @@
 import org.apache.thrift.transport.TSocket;
 
 import flexjson.JSONTokener;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
 
 /**
  *  A class for caching the ring map at the client. For usage example, see
@@ -72,8 +74,7 @@
 
                 Map<String,String> tokenToHostMap = (Map<String,String>) new
JSONTokener(client.get_string_property(CassandraServer.TOKEN_MAP)).nextValue();
                 
-                HashMap<Token, InetAddress> tokenEndpointMap = new HashMap<Token,
InetAddress>();
-                Map<InetAddress, Token> endpointTokenMap = new HashMap<InetAddress,
Token>();
+                BiMap<Token, InetAddress> tokenEndpointMap = HashBiMap.create();
                 for (Map.Entry<String,String> entry : tokenToHostMap.entrySet())
                 {
                     Token token = StorageService.getPartitioner().getTokenFactory().fromString(entry.getKey());
@@ -81,7 +82,6 @@
                     try
                     {
                         tokenEndpointMap.put(token, InetAddress.getByName(host));
-                        endpointTokenMap.put(InetAddress.getByName(host), token);
                     }
                     catch (UnknownHostException e)
                     {
@@ -89,7 +89,7 @@
                     }
                 }
 
-                TokenMetadata tokenMetadata = new TokenMetadata(tokenEndpointMap, endpointTokenMap,
null);
+                TokenMetadata tokenMetadata = new TokenMetadata(tokenEndpointMap);
                 Class cls = DatabaseDescriptor.getReplicaPlacementStrategyClass();
                 Class [] parameterTypes = new Class[] { TokenMetadata.class, IPartitioner.class,
int.class, int.class};
                 try

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Tue Oct 27
14:40:18 2009
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.io.IOError;
 
 import org.apache.log4j.Logger;
 
@@ -60,20 +61,27 @@
     /**
      * Record token being used by another node
      */
-    public static synchronized void updateToken(InetAddress ep, Token token) throws IOException
+    public static synchronized void updateToken(InetAddress ep, Token token)
     {
         IPartitioner p = StorageService.getPartitioner();
         ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF);
         cf.addColumn(new Column(ep.getAddress(), p.getTokenFactory().toByteArray(token),
System.currentTimeMillis()));
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
         rm.add(cf);
-        rm.apply();
+        try
+        {
+            rm.apply();
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
     }
 
     /**
      * This method is used to update the System Table with the new token for this node
     */
-    public static synchronized void updateToken(Token token) throws IOException
+    public static synchronized void updateToken(Token token)
     {
         assert metadata != null;
         IPartitioner p = StorageService.getPartitioner();
@@ -81,7 +89,14 @@
         cf.addColumn(new Column(SystemTable.TOKEN, p.getTokenFactory().toByteArray(token),
System.currentTimeMillis()));
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
         rm.add(cf);
-        rm.apply();
+        try
+        {
+            rm.apply();
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
         metadata.setToken(token);
     }
     

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Oct
27 14:40:18 2009
@@ -182,7 +182,7 @@
             {
                 Token<?> t = getBootstrapTokenFrom(maxEndpoint);
                 logger.info("Setting token to " + t + " to assume load from " + maxEndpoint);
-                ss.updateToken(t);
+                ss.setToken(t);
             }
         }
 
@@ -193,7 +193,7 @@
                 // Mark as not bootstrapping to calculate ranges correctly
                 for (int i=0; i< targets.size(); i++)
                 {
-                    tokenMetadata.update(tokens[i], targets.get(i), false);
+                    tokenMetadata.setBootstrapping(targets.get(i), false);
                 }
 
                 Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget
= getRangesWithSourceTarget();
@@ -210,7 +210,7 @@
                 }
             }
         }).start();
-        Gossiper.instance().addApplicationState(StorageService.BOOTSTRAP_MODE, new ApplicationState(""));
+        Gossiper.instance().addApplicationState(StorageService.MODE, new ApplicationState(StorageService.MODE_MOVING));
     }
 
     public static class BootstrapTokenVerbHandler implements IVerbHandler

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java Tue Oct
27 14:40:18 2009
@@ -86,11 +86,6 @@
         applicationState_.put(key, appState);        
     }
     
-    void deleteApplicationState(String key)
-    {
-        applicationState_.remove(key);
-    }
-
     /* getters and setters */
     long getUpdateTimestamp()
     {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Tue Oct 27 14:40:18
2009
@@ -492,6 +492,8 @@
 
     synchronized EndPointState getStateForVersionBiggerThan(InetAddress forEndpoint, int
version)
     {
+        if (logger_.isTraceEnabled())
+            logger_.trace("Scanning for state greater than " + version + " for " + forEndpoint);
         EndPointState epState = endPointStateMap_.get(forEndpoint);
         EndPointState reqdEndPointState = null;
 
@@ -923,12 +925,6 @@
             epState.addApplicationState(key, appState);
         }
     }
-    
-    public synchronized void deleteApplicationState(String key)
-    {
-        EndPointState epState = endPointStateMap_.get(localEndPoint_);
-        epState.deleteApplicationState(key);
-    }
 
     public void stop()
     {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Tue
Oct 27 14:40:18 2009
@@ -27,63 +27,58 @@
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.service.UnavailableException;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
 
 public class TokenMetadata
 {
     /* Maintains token to endpoint map of every node in the cluster. */
-    private Map<Token, InetAddress> tokenToEndPointMap;
-    /* Maintains a reverse index of endpoint to token in the cluster. */
-    private Map<InetAddress, Token> endPointToTokenMap;
+    private BiMap<Token, InetAddress> tokenToEndPointMap;
     /* Bootstrapping nodes and their tokens */
-    private Map<Token, InetAddress> bootstrapNodes;
+    private Set<InetAddress> bootstrapping;
+    private BiMap<Token, InetAddress> bootstrapTokenMap;
     
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
 
     public TokenMetadata()
     {
-        tokenToEndPointMap = new HashMap<Token, InetAddress>();
-        endPointToTokenMap = new HashMap<InetAddress, Token>();
-        this.bootstrapNodes = Collections.synchronizedMap(new HashMap<Token, InetAddress>());
+        this(null, null);
     }
 
-    public TokenMetadata(Map<Token, InetAddress> tokenToEndPointMap, Map<InetAddress,
Token> endPointToTokenMap, Map<Token, InetAddress> bootstrapNodes)
+    public TokenMetadata(BiMap<Token, InetAddress> tokenToEndPointMap, BiMap<Token,
InetAddress> bootstrapTokenMap)
     {
+        bootstrapping = new NonBlockingHashSet<InetAddress>();
+        if (tokenToEndPointMap == null)
+            tokenToEndPointMap = HashBiMap.create();
+        if (bootstrapTokenMap == null)
+            bootstrapTokenMap = HashBiMap.create();
         this.tokenToEndPointMap = tokenToEndPointMap;
-        this.endPointToTokenMap = endPointToTokenMap;
-        this.bootstrapNodes = bootstrapNodes;
+        this.bootstrapTokenMap = bootstrapTokenMap;
     }
-    
-    public TokenMetadata cloneMe()
-    {
-        return new TokenMetadata(cloneTokenEndPointMap(), cloneEndPointTokenMap(), cloneBootstrapNodes());
-    }
-        
-    public void update(Token token, InetAddress endpoint)
+
+    public TokenMetadata(BiMap<Token, InetAddress> tokenEndpointMap)
     {
-        this.update(token, endpoint, false);
+        this(tokenEndpointMap, null);
     }
-    /**
-     * Update the two maps in an safe mode. 
-    */
-    public void update(Token token, InetAddress endpoint, boolean bootstrapState)
+
+    public void setBootstrapping(InetAddress endpoint, boolean isBootstrapping)
     {
+        if (isBootstrapping)
+            bootstrapping.add(endpoint);
+        else
+            bootstrapping.remove(endpoint);
+
         lock.writeLock().lock();
         try
         {
-            if (bootstrapState)
-            {
-                bootstrapNodes.put(token, endpoint);
-                this.remove(endpoint);
-            }
-            else
+            BiMap<Token, InetAddress> otherMap = bootstrapping.contains(endpoint) ?
tokenToEndPointMap : bootstrapTokenMap;
+            Token t = otherMap.inverse().get(endpoint);
+            if (t != null)
             {
-                bootstrapNodes.remove(token); // If this happened to be there 
-                Token oldToken = endPointToTokenMap.get(endpoint);
-                if ( oldToken != null )
-                    tokenToEndPointMap.remove(oldToken);
-                tokenToEndPointMap.put(token, endpoint);
-                endPointToTokenMap.put(endpoint, token);
+                Map<Token, InetAddress> map = bootstrapping.contains(endpoint) ? bootstrapTokenMap
: tokenToEndPointMap;
+                map.put(t, endpoint);
             }
         }
         finally
@@ -91,33 +86,37 @@
             lock.writeLock().unlock();
         }
     }
-    
+
     /**
-     * Remove the entries in the two maps.
-     * @param endpoint
-     */
-    public void remove(InetAddress endpoint)
+     * Update the two maps in an safe mode. 
+    */
+    public void update(Token token, InetAddress endpoint)
     {
+        assert token != null;
+        assert endpoint != null;
+
         lock.writeLock().lock();
         try
-        {            
-            Token oldToken = endPointToTokenMap.get(endpoint);
-            if ( oldToken != null )
-                tokenToEndPointMap.remove(oldToken);
-            endPointToTokenMap.remove(endpoint);
+        {
+            Map<Token, InetAddress> map = bootstrapping.contains(endpoint) ? bootstrapTokenMap
: tokenToEndPointMap;
+            Map<Token, InetAddress> otherMap = bootstrapping.contains(endpoint) ? tokenToEndPointMap
: bootstrapTokenMap;
+            map.put(token, endpoint);
+            otherMap.remove(token);
         }
         finally
         {
             lock.writeLock().unlock();
         }
     }
-    
+
     public Token getToken(InetAddress endpoint)
     {
+        assert endpoint != null;
+
         lock.readLock().lock();
         try
         {
-            return endPointToTokenMap.get(endpoint);
+            return tokenToEndPointMap.inverse().get(endpoint);
         }
         finally
         {
@@ -125,12 +124,14 @@
         }
     }
     
-    public boolean isKnownEndPoint(InetAddress ep)
+    public boolean isKnownEndPoint(InetAddress endpoint)
     {
+        assert endpoint != null;
+
         lock.readLock().lock();
         try
         {
-            return endPointToTokenMap.containsKey(ep);
+            return tokenToEndPointMap.inverse().containsKey(endpoint);
         }
         finally
         {
@@ -156,8 +157,10 @@
     }
     
 
-    public InetAddress getNextEndpoint(InetAddress endPoint) throws UnavailableException
+    public InetAddress getNextEndpoint(InetAddress endpoint) throws UnavailableException
     {
+        assert endpoint != null;
+
         lock.readLock().lock();
         try
         {
@@ -165,7 +168,7 @@
             if (tokens.isEmpty())
                 return null;
             Collections.sort(tokens);
-            int i = tokens.indexOf(endPointToTokenMap.get(endPoint)); // TODO binary search
+            int i = tokens.indexOf(tokenToEndPointMap.inverse().get(endpoint)); // TODO binary
search
             int j = 1;
             InetAddress ep;
             while (!FailureDetector.instance().isAlive((ep = tokenToEndPointMap.get(tokens.get((i
+ j) % tokens.size())))))
@@ -188,7 +191,7 @@
         lock.readLock().lock();
         try
         {            
-            return new HashMap<Token, InetAddress>( bootstrapNodes );
+            return new HashMap<Token, InetAddress>(bootstrapTokenMap);
         }
         finally
         {
@@ -221,7 +224,7 @@
         lock.readLock().lock();
         try
         {            
-            return new HashMap<InetAddress, Token>(endPointToTokenMap);
+            return new HashMap<InetAddress, Token>(tokenToEndPointMap.inverse());
         }
         finally
         {
@@ -232,13 +235,13 @@
     public String toString()
     {
         StringBuilder sb = new StringBuilder();
-        Set<InetAddress> eps = endPointToTokenMap.keySet();
-        
+        Set<InetAddress> eps = tokenToEndPointMap.inverse().keySet();
+
         for ( InetAddress ep : eps )
         {
             sb.append(ep);
             sb.append(":");
-            sb.append(endPointToTokenMap.get(ep));
+            sb.append(tokenToEndPointMap.inverse().get(ep));
             sb.append(System.getProperty("line.separator"));
         }
         
@@ -257,4 +260,10 @@
             lock.readLock().unlock();
         }
     }
+
+    public void clearUnsafe()
+    {
+        tokenToEndPointMap.clear();
+        bootstrapTokenMap.clear();
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue
Oct 27 14:40:18 2009
@@ -41,6 +41,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.log4j.Level;
+import org.apache.commons.lang.StringUtils;
 
 /*
  * This abstraction contains the token/identifier of this node
@@ -51,9 +52,12 @@
 public final class StorageService implements IEndPointStateChangeSubscriber, StorageServiceMBean
 {
     private static Logger logger_ = Logger.getLogger(StorageService.class);     
-    private final static String NODE_ID = "NODE-IDENTIFIER";
-    public final static String BOOTSTRAP_MODE = "BOOTSTRAP-MODE";
-    
+
+    private final static String NODE_ID = "NODE-ID";
+    public final static String MODE = "MODE";
+    public final static String MODE_MOVING = "move";
+    public final static String MODE_NORMAL = "run";
+
     /* All stage identifiers */
     public final static String mutationStage_ = "ROW-MUTATION-STAGE";
     public final static String readStage_ = "ROW-READ-STAGE";
@@ -150,39 +154,40 @@
         bootstrapSet.add(s);
     }
     
-    public synchronized boolean removeBootstrapSource(InetAddress s)
+    public synchronized void removeBootstrapSource(InetAddress s)
     {
         bootstrapSet.remove(s);
-
         if (logger_.isDebugEnabled())
             logger_.debug("Removed " + s + " as a bootstrap source");
+
         if (bootstrapSet.isEmpty())
         {
             SystemTable.setBootstrapped();
-            isBootstrapMode = false;
-            updateTokenMetadata(storageMetadata_.getToken(), FBUtilities.getLocalAddress(),
false);
-
+            Gossiper.instance().addApplicationState(MODE, new ApplicationState(MODE_NORMAL));
             logger_.info("Bootstrap completed! Now serving reads.");
-            /* Tell others you're not bootstrapping anymore */
-            Gossiper.instance().deleteApplicationState(BOOTSTRAP_MODE);
         }
-        return isBootstrapMode;
     }
 
-    private void updateTokenMetadata(Token token, InetAddress endpoint, boolean isBootstraping)
+    private void updateForeignToken(Token token, InetAddress endpoint)
     {
-        tokenMetadata_.update(token, endpoint, isBootstraping);
-        if (!isBootstraping)
-        {
-            try
-            {
-                SystemTable.updateToken(endpoint, token);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
+        tokenMetadata_.update(token, endpoint);
+        SystemTable.updateToken(endpoint, token);
+    }
+
+    /** This method updates the local token on disk and starts broacasting it to others.
*/
+    public void setToken(Token token)
+    {
+        SystemTable.updateToken(token);
+        tokenMetadata_.update(token, FBUtilities.getLocalAddress());
+    }
+
+    public void setAndBroadcastToken(Token token)
+    {
+        if (logger_.isDebugEnabled())
+            logger_.debug("Setting token to " + token + " and gossiping it");
+        setToken(token);
+        ApplicationState state = new ApplicationState(partitioner_.getTokenFactory().toString(token));
+        Gossiper.instance().addApplicationState(StorageService.NODE_ID, state);
     }
 
     public StorageService()
@@ -256,19 +261,17 @@
 
         if (isBootstrapMode)
         {
+            logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
+            Gossiper.instance().addApplicationState(MODE, new ApplicationState(MODE_MOVING));
             new BootStrapper(Arrays.asList(FBUtilities.getLocalAddress()), getLocalToken()).startBootstrap();
// handles token update
         }
         else
         {
             SystemTable.setBootstrapped();
-            tokenMetadata_.update(storageMetadata_.getToken(), FBUtilities.getLocalAddress(),
isBootstrapMode);
         }
+        setAndBroadcastToken(storageMetadata_.getToken());
 
-        // Gossip my token.
-        // note that before we do this we've (a) finalized what the token is actually going
to be, and
-        // (b) added a bootstrap state (done by startBootstrap)
-        ApplicationState state = new ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(storageMetadata_.getToken()));
-        Gossiper.instance().addApplicationState(StorageService.NODE_ID, state);
+        assert tokenMetadata_.cloneTokenEndPointMap().size() > 0;
     }
 
     public boolean isBootstrapMode()
@@ -278,7 +281,7 @@
 
     public TokenMetadata getTokenMetadata()
     {
-        return tokenMetadata_.cloneMe();
+        return tokenMetadata_;
     }
 
     /* TODO: used for testing */
@@ -374,12 +377,16 @@
         /* node identifier for this endpoint on the identifier space */
         ApplicationState nodeIdState = epState.getApplicationState(StorageService.NODE_ID);
         /* Check if this has a bootstrapping state message */
-        boolean bootstrapState = epState.getApplicationState(StorageService.BOOTSTRAP_MODE)
!= null;
-        if (bootstrapState)
+        ApplicationState modeState = epState.getApplicationState(StorageService.MODE);
+        if (modeState != null)
         {
+            String mode = modeState.getState();
             if (logger_.isDebugEnabled())
-                logger_.debug(endpoint + " is in bootstrap state.");
+                logger_.debug(endpoint + " is in " + mode + " mode");
+            boolean bootstrapState = mode.equals(MODE_MOVING);
+            tokenMetadata_.setBootstrapping(endpoint,  bootstrapState);
         }
+
         if (nodeIdState != null)
         {
             Token newToken = getPartitioner().getTokenFactory().fromString(nodeIdState.getState());
@@ -399,7 +406,7 @@
                 {
                     if (logger_.isDebugEnabled())
                       logger_.debug("Relocation for endpoint " + endpoint);
-                    updateTokenMetadata(newToken, endpoint, bootstrapState);
+                    updateForeignToken(newToken, endpoint);
                 }
                 else
                 {
@@ -417,7 +424,7 @@
                 /*
                  * This is a new node and we just update the token map.
                 */
-                updateTokenMetadata(newToken, endpoint, bootstrapState);
+                updateForeignToken(newToken, endpoint);
             }
         }
         else
@@ -462,37 +469,6 @@
         return map;
     }
 
-    /*
-     * This method updates the token on disk and modifies the cached
-     * StorageMetadata instance. This is only for the local endpoint.
-    */
-    public void updateToken(Token token) throws IOException
-    {
-        if (logger_.isDebugEnabled())
-          logger_.debug("Setting token to " + token);
-        /* update the token on disk */
-        SystemTable.updateToken(token);
-        /* Update the token maps */
-        tokenMetadata_.update(token, FBUtilities.getLocalAddress());
-        /* Gossip this new token for the local storage instance */
-        ApplicationState state = new ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(token));
-        Gossiper.instance().addApplicationState(StorageService.NODE_ID, state);
-    }
-    
-    /*
-     * This method removes the state associated with this endpoint
-     * from the TokenMetadata instance.
-     * 
-     *  @param endpoint remove the token state associated with this 
-     *         endpoint.
-     */
-    public void removeTokenState(InetAddress endpoint)
-    {
-        tokenMetadata_.remove(endpoint);
-        /* Remove the state from the Gossiper */
-        Gossiper.instance().removeFromMembership(endpoint);
-    }
-
     /**
      * Deliver hints to the specified node when it has crashed
      * and come back up/ marked as alive after a network partition
@@ -705,6 +681,8 @@
     */
     public Range[] getAllRanges(Set<Token> tokens)
     {
+        if (logger_.isDebugEnabled())
+            logger_.debug("computing ranges for " + StringUtils.join(tokens, ", "));
         List<Range> ranges = new ArrayList<Range>();
         List<Token> allTokens = new ArrayList<Token>(tokens);
         Collections.sort(allTokens);
@@ -895,4 +873,9 @@
     {
         return replicationStrategy_.getResponseHandler(responseResolver, blockFor, consistency_level);
     }
+
+    public AbstractReplicationStrategy getReplicationStrategy()
+    {
+        return replicationStrategy_;
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
Tue Oct 27 14:40:18 2009
@@ -38,10 +38,8 @@
         bufIn.reset(body, body.length);
         try
         {
-            /* Deserialize to get the token for this endpoint. */
             Token token = Token.serializer().deserialize(bufIn);
-            logger_.info("Updating the token to [" + token + "]");
-            StorageService.instance().updateToken(token);
+            StorageService.instance().setAndBroadcastToken(token);
         }
         catch (IOException ex)
         {

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
(original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
Tue Oct 27 14:40:18 2009
@@ -118,7 +118,8 @@
         //Add bootstrap node id=6
         Token bsToken = new BigIntegerToken(String.valueOf(25));
         InetAddress bootstrapEndPoint = InetAddress.getByName("127.0.0.6");
-        tmd.update(bsToken, bootstrapEndPoint, true);
+        tmd.setBootstrapping(bootstrapEndPoint, true);
+        tmd.update(bsToken, bootstrapEndPoint);
         
         for (int i = 0; i < keyTokens.length; i++)
         {



Mime
View raw message