cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r832285 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/dht/
Date Tue, 03 Nov 2009 03:12:16 GMT
Author: jbellis
Date: Tue Nov  3 03:12:16 2009
New Revision: 832285

URL: http://svn.apache.org/viewvc?rev=832285&view=rev
Log:
prefer bootstrapping into ring sections that are not already being bootstrapped into
patch by jbellis; reviewed by Vijay Parthasarathy for CASSANDRA-517

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=832285&r1=832284&r2=832285&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Nov
 3 03:12:16 2009
@@ -111,41 +111,57 @@
         }).start();
     }
 
-    public static void guessTokenIfNotSpecified(TokenMetadata metadata) throws IOException
+    /**
+     * if initialtoken was specified, use that.
+     * otherwise, pick a token to assume half the load of the most-loaded node.
+     */
+    public static Token getBootstrapToken(final TokenMetadata metadata, final Map<InetAddress,
Double> load) throws IOException
     {
-        StorageService ss = StorageService.instance();
-        StorageLoadBalancer slb = StorageLoadBalancer.instance();
+        if (DatabaseDescriptor.getInitialToken() != null)
+        {
+            logger.debug("token manually specified as " + DatabaseDescriptor.getInitialToken());
+            return StorageService.getPartitioner().getTokenFactory().fromString(DatabaseDescriptor.getInitialToken());
+        }
 
-        slb.waitForLoadInfo();
-        logger.debug("... got load info");
+        InetAddress maxEndpoint = getBootstrapSource(metadata, load);
+        Token<?> t = getBootstrapTokenFrom(maxEndpoint);
+        logger.info("New token will be " + t + " to assume load from " + maxEndpoint);
+        return t;
+    }
 
-        // if initialtoken was specified, use that.  otherwise, pick a token to assume half
the load of the most-loaded node.
-        if (DatabaseDescriptor.getInitialToken() == null)
+    static InetAddress getBootstrapSource(final TokenMetadata metadata, final Map<InetAddress,
Double> load)
+    {
+        // sort first by number of nodes already bootstrapping into a source node's range,
then by load.
+        List<InetAddress> endpoints = new ArrayList<InetAddress>(load.size());
+        for (InetAddress endpoint : load.keySet())
         {
-            double maxLoad = 0;
-            InetAddress maxEndpoint = null;
-            for (Map.Entry<InetAddress, Double> entry : slb.getLoadInfo().entrySet())
-            {
-                logger.debug("considering " + entry.getKey() + " with load of " + entry.getValue());
-                if (!metadata.isMember(entry.getKey()))
-                    continue;
-                if (maxEndpoint == null || entry.getValue() > maxLoad)
-                {
-                    maxEndpoint = entry.getKey();
-                    maxLoad = entry.getValue();
-                }
-            }
-            if (maxEndpoint == null)
+            if (!metadata.isMember(endpoint))
+                continue;
+            endpoints.add(endpoint);
+        }
+
+        if (endpoints.isEmpty())
+            throw new RuntimeException("No other nodes seen!  Unable to bootstrap");
+        Collections.sort(endpoints, new Comparator<InetAddress>()
+        {
+            public int compare(InetAddress ia1, InetAddress ia2)
             {
-                throw new RuntimeException("No bootstrap sources found");
+                int n1 = metadata.bootstrapTargets(ia1);
+                int n2 = metadata.bootstrapTargets(ia2);
+                if (n1 != n2)
+                    return -(n1 - n2); // more targets = _less_ priority!
+
+                double load1 = load.get(ia1);
+                double load2 = load.get(ia2);
+                if (load1 == load2)
+                    return 0;
+                return load1 < load2 ? -1 : 1;
             }
+        });
 
-            assert !maxEndpoint.equals(FBUtilities.getLocalAddress());
-            logger.debug("asking " + maxEndpoint + " for token");
-            Token<?> t = getBootstrapTokenFrom(maxEndpoint);
-            logger.info("New token will be " + t + " to assume load from " + maxEndpoint);
-            ss.setToken(t);
-        }
+        InetAddress maxEndpoint = endpoints.get(endpoints.size() - 1);
+        assert !maxEndpoint.equals(FBUtilities.getLocalAddress());
+        return maxEndpoint;
     }
 
     /** get potential sources for each range, ordered by proximity (as determined by EndPointSnitch)
*/

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=832285&r1=832284&r2=832285&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Tue
Nov  3 03:12:16 2009
@@ -26,6 +26,9 @@
 import org.apache.cassandra.dht.Range;
 
 import java.net.InetAddress;
+
+import org.apache.commons.lang.StringUtils;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.service.UnavailableException;
@@ -89,6 +92,21 @@
         }
     }
 
+    /** @return the number of nodes bootstrapping into source's primary range */
+    public int bootstrapTargets(InetAddress source)
+    {
+        int n = 0;
+        Range sourceRange = getPrimaryRangeFor(getToken(source));
+        for (Token token : bootstrapTokenMap.keySet())
+        {
+            if (sourceRange.contains(token))
+            {
+                n++;
+            }
+        }
+        return n;
+    }
+
     /**
      * Update the two maps in an safe mode. 
     */
@@ -270,14 +288,16 @@
     {
         List tokens = sortedTokens();
         int index = Collections.binarySearch(tokens, token);
-        return (Token) (index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(--index));
+        assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndPointMap.keySet(),
", ");
+        return (Token) (index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1));
     }
 
     public Token getSuccessor(Token token)
     {
         List tokens = sortedTokens();
         int index = Collections.binarySearch(tokens, token);
-        return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(++index));
+        assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndPointMap.keySet(),
", ");
+        return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index
+ 1));
     }
 
     public Iterable<? extends Token> bootstrapTokens()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=832285&r1=832284&r2=832285&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue
Nov  3 03:12:16 2009
@@ -270,9 +270,11 @@
 
         if (isBootstrapMode)
         {
-            logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
             Gossiper.instance().addApplicationState(MODE, new ApplicationState(MODE_MOVING));
-            BootStrapper.guessTokenIfNotSpecified(tokenMetadata_);
+            logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
+            StorageLoadBalancer.instance().waitForLoadInfo();
+            logger_.info("... got load info");
+            setToken(BootStrapper.getBootstrapToken(tokenMetadata_, StorageLoadBalancer.instance().getLoadInfo()));
             new BootStrapper(replicationStrategy_, FBUtilities.getLocalAddress(), getLocalToken(),
tokenMetadata_).startBootstrap(); // handles token update
         }
         else

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=832285&r1=832284&r2=832285&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Tue
Nov  3 03:12:16 2009
@@ -18,24 +18,56 @@
 */
 package org.apache.cassandra.dht;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 import static org.junit.Assert.assertEquals;
 import org.junit.Test;
 
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.gms.IFailureDetector;
-import org.apache.cassandra.gms.IFailureDetectionEventListener;
-import com.google.common.collect.Multimap;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class BootStrapperTest
+{
+    @Test
+    public void testGuessToken() throws IOException
+    {
+        StorageService ss = StorageService.instance();
+
+        generateFakeEndpoints(3);
+
+        InetAddress one = InetAddress.getByName("127.0.0.2");
+        InetAddress two = InetAddress.getByName("127.0.0.3");
+        InetAddress three = InetAddress.getByName("127.0.0.4");
+        Map<InetAddress, Double> load = new HashMap<InetAddress, Double>();
+        load.put(one, 1.0);
+        load.put(two, 2.0);
+        load.put(three, 3.0);
+
+        TokenMetadata tmd = ss.getTokenMetadata();
+        InetAddress source = BootStrapper.getBootstrapSource(tmd, load);
+        assert three.equals(source);
+
+        InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
+        tmd.setBootstrapping(myEndpoint, true);
+        Range range3 = ss.getPrimaryRangeForEndPoint(three);
+        Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left(),
range3.right());
+        assert range3.contains(fakeToken);
+        tmd.update(fakeToken, myEndpoint);
+
+        InetAddress source2 = BootStrapper.getBootstrapSource(tmd, load);
+        assert two.equals(source2) : source2;
+    }
 
-public class BootStrapperTest {
     @Test
     public void testSourceTargetComputation() throws UnknownHostException
     {
@@ -49,7 +81,6 @@
         StorageService ss = StorageService.instance();
 
         generateFakeEndpoints(numOldNodes);
-        
         Token myToken = StorageService.getPartitioner().getRandomToken();
         InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
 
@@ -94,7 +125,9 @@
         for (int i = 1; i <= numOldNodes; i++)
         {
             // leave .1 for myEndpoint
-            tmd.update(p.getRandomToken(), InetAddress.getByName("127.0.0." + (i + 1)));
+            // TODO use this when #519 is fixed
+            // tmd.update(p.getRandomToken(), InetAddress.getByName("127.0.0." + (i + 1)));
+            tmd.update(p.getToken(FBUtilities.bytesToHex(FBUtilities.toByteArray(i * 13))),
InetAddress.getByName("127.0.0." + (i + 1)));
         }
     }
 }
\ No newline at end of file



Mime
View raw message