cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r891430 - in /incubator/cassandra/trunk: ./ interface/gen-java/org/apache/cassandra/service/ src/java/org/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/tools/ test/unit/org/ tes...
Date Wed, 16 Dec 2009 21:28:56 GMT
Author: jbellis
Date: Wed Dec 16 21:28:55 2009
New Revision: 891430

URL: http://svn.apache.org/viewvc?rev=891430&view=rev
Log:
merge from 0.5

Modified:
    incubator/cassandra/trunk/   (props changed)
    incubator/cassandra/trunk/CHANGES.txt
    incubator/cassandra/trunk/NEWS.txt
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
  (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java
  (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java
  (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java
  (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java
  (props changed)
    incubator/cassandra/trunk/src/java/org/   (props changed)
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
    incubator/cassandra/trunk/test/unit/org/   (props changed)
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java

Propchange: incubator/cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,3 +1,3 @@
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5:888872-890627
+/incubator/cassandra/branches/cassandra-0.5:888872-891425

Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Wed Dec 16 21:28:55 2009
@@ -2,6 +2,10 @@
  * Fix potential NPE in get_range_slice (CASSANDRA-623)
  * add CRC32 to commitlog entries (CASSANDRA-605)
  * fix data streaming on windows (CASSANDRA-630)
+ * GC compacted sstables after cleanup and compaction (CASSANDRA-621)
+ * Speed up anti-entropy validation (CASSANDRA-629)
+ * Fix pending range conflicts when bootstapping or moving
+   multiple nodes at once (CASSANDRA-603)
 
 
 0.5.0 beta 2

Modified: incubator/cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/NEWS.txt?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
--- incubator/cassandra/trunk/NEWS.txt (original)
+++ incubator/cassandra/trunk/NEWS.txt Wed Dec 16 21:28:55 2009
@@ -8,6 +8,10 @@
    out; if that happens, just go back to 0.4 and flush again.)
    The format changed twice: from 0.4 to beta1, and from beta2 to RC1.
 
+.5 The gossip protocol has changed, meaning 0.5 nodes cannot coexist
+   in a cluster of 0.4 nodes or vice versa; you must upgrade your
+   whole cluster at the same time.
+
 1. Bootstrap, move, load balancing, and active repair have been added.
    See http://wiki.apache.org/cassandra/Operations.  When upgrading
    from 0.4, leave autobootstrap set to false for the first restart

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-890627
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-891425
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,5 +1,5 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-890627
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-891425
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java:749219-794428
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/column_t.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-890627
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-891425
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-890627
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-891425
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,5 +1,5 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-890627
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-891425
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:749219-794428
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:749219-768588

Propchange: incubator/cassandra/trunk/src/java/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/src/java/org:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/src/java/org:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/src/java/org:888872-890627
+/incubator/cassandra/branches/cassandra-0.5/src/java/org:888872-891425
 /incubator/cassandra/trunk/src/java/org:749219-769885

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Wed Dec 16 21:28:55 2009
@@ -91,11 +91,11 @@
 
         List<InetAddress> endpoints = new ArrayList<InetAddress>(naturalEndpoints);
 
-        for (Map.Entry<Range, InetAddress> entry : tokenMetadata_.getPendingRanges().entrySet())
+        for (Map.Entry<Range, Collection<InetAddress>> entry : tokenMetadata_.getPendingRanges().entrySet())
         {
             if (entry.getKey().contains(token))
             {
-                endpoints.add(entry.getValue());
+                endpoints.addAll(entry.getValue());
             }
         }
 
@@ -202,26 +202,9 @@
 
     public Collection<Range> getPendingAddressRanges(TokenMetadata metadata, Token
pendingToken, InetAddress pendingAddress)
     {
-        TokenMetadata temp = metadata.cloneWithoutPending();
-        temp.update(pendingToken, pendingAddress);
+        TokenMetadata temp = metadata.cloneOnlyTokenMap();
+        temp.updateNormalToken(pendingToken, pendingAddress);
         return getAddressRanges(temp).get(pendingAddress);
     }
 
-    public void removeObsoletePendingRanges()
-    {
-        Multimap<InetAddress, Range> ranges = getAddressRanges();
-        for (Map.Entry<Range, InetAddress> entry : tokenMetadata_.getPendingRanges().entrySet())
-        {
-            for (Range currentRange : ranges.get(entry.getValue()))
-            {
-                if (currentRange.contains(entry.getKey()))
-                {
-                    if (logger_.isDebugEnabled())
-                        logger_.debug("Removing obsolete pending range " + entry.getKey()
+ " from " + entry.getValue());
-                    tokenMetadata_.removePendingRange(entry.getKey());
-                    break;
-                }
-            }
-        }
-    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Wed
Dec 16 21:28:55 2009
@@ -29,15 +29,33 @@
 
 import org.apache.commons.lang.StringUtils;
 
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.HashMultimap;
 
 public class TokenMetadata
 {
     /* Maintains token to endpoint map of every node in the cluster. */
     private BiMap<Token, InetAddress> tokenToEndPointMap;
-    private Map<Range, InetAddress> pendingRanges;
+
+    // Suppose that there is a ring of nodes A, C and E, with replication factor 3.
+    // Node D bootstraps between C and E, so its pending ranges will be E-A, A-C and C-D.
+    // Now suppose node B bootstraps between A and C at the same time. Its pending ranges
would be C-E, E-A and A-B.
+    // Now both nodes have pending range E-A in their list, which will cause pending range
collision
+    // even though we're only talking about replica range, not even primary range. The same
thing happens
+    // for any nodes that boot simultaneously between same two nodes. For this we cannot
simply make pending ranges a multimap,
+    // since that would make us unable to notice the real problem of two nodes trying to
boot using the same token.
+    // In order to do this properly, we need to know what tokens are booting at any time.
+    private Map<Token, InetAddress> bootstrapTokens;
+
+    // we will need to know at all times what nodes are leaving and calculate ranges accordingly.
+    // An anonymous pending ranges list is not enough, as that does not tell which node is
leaving
+    // and/or if the ranges are there because of bootstrap or leave operation.
+    // (See CASSANDRA-603 for more detail + examples).
+    private Set<InetAddress> leavingEndPoints;
+
+    private Multimap<Range, InetAddress> pendingRanges;
 
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -53,7 +71,9 @@
         if (tokenToEndPointMap == null)
             tokenToEndPointMap = HashBiMap.create();
         this.tokenToEndPointMap = tokenToEndPointMap;
-        pendingRanges = new NonBlockingHashMap<Range, InetAddress>();
+        bootstrapTokens = new HashMap<Token, InetAddress>();
+        leavingEndPoints = new HashSet<InetAddress>();
+        pendingRanges = HashMultimap.create();
         sortedTokens = sortTokens();
     }
 
@@ -69,18 +89,13 @@
     {
         int n = 0;
         Range sourceRange = getPrimaryRangeFor(getToken(source));
-        for (Map.Entry<Range, InetAddress> entry : pendingRanges.entrySet())
-        {
-            if (sourceRange.contains(entry.getKey()) || entry.getValue().equals(source))
+        for (Token token : bootstrapTokens.keySet())
+            if (sourceRange.contains(token))
                 n++;
-        }
         return n;
     }
 
-    /**
-     * Update the two maps in an safe mode. 
-    */
-    public void update(Token token, InetAddress endpoint)
+    public void updateNormalToken(Token token, InetAddress endpoint)
     {
         assert token != null;
         assert endpoint != null;
@@ -88,6 +103,8 @@
         lock.writeLock().lock();
         try
         {
+            bootstrapTokens.remove(token);
+
             tokenToEndPointMap.inverse().remove(endpoint);
             if (!endpoint.equals(tokenToEndPointMap.put(token, endpoint)))
             {
@@ -100,13 +117,49 @@
         }
     }
 
+    public void addBootstrapToken(Token token, InetAddress endpoint)
+    {
+        assert token != null;
+        assert endpoint != null;
+
+        lock.writeLock().lock();
+        try
+        {
+            InetAddress oldEndPoint = bootstrapTokens.get(token);
+            if (oldEndPoint != null && !oldEndPoint.equals(endpoint))
+                throw new RuntimeException("Bootstrap Token collision between " + oldEndPoint
+ " and " + endpoint + " (token " + token);
+            bootstrapTokens.put(token, endpoint);
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public void addLeavingEndPoint(InetAddress endpoint)
+    {
+        assert endpoint != null;
+
+        lock.writeLock().lock();
+        try
+        {
+            leavingEndPoints.add(endpoint);
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
     public void removeEndpoint(InetAddress endpoint)
     {
         assert tokenToEndPointMap.containsValue(endpoint);
         lock.writeLock().lock();
         try
         {
+            bootstrapTokens.remove(getToken(endpoint));
             tokenToEndPointMap.inverse().remove(endpoint);
+            leavingEndPoints.remove(endpoint);
             sortedTokens = sortTokens();
         }
         finally
@@ -161,7 +214,11 @@
         }
     }
 
-    public TokenMetadata cloneWithoutPending()
+    /**
+     * Create a copy of TokenMetadata with only tokenToEndPointMap. That is, pending ranges,
+     * bootstrap tokens and leaving endpoints are not included in the copy.
+     */
+    public TokenMetadata cloneOnlyTokenMap()
     {
         lock.readLock().lock();
         try
@@ -174,28 +231,24 @@
         }
     }
 
-    public String toString()
+    /**
+     * Create a copy of TokenMetadata with tokenToEndPointMap reflecting situation after
all
+     * current leave operations have finished.
+     */
+    public TokenMetadata cloneAfterAllLeft()
     {
-        StringBuilder sb = new StringBuilder();
         lock.readLock().lock();
         try
         {
-            Set<InetAddress> eps = tokenToEndPointMap.inverse().keySet();
-
-            for (InetAddress ep : eps)
-            {
-                sb.append(ep);
-                sb.append(":");
-                sb.append(tokenToEndPointMap.inverse().get(ep));
-                sb.append(System.getProperty("line.separator"));
-            }
+            TokenMetadata allLeftMetadata = cloneOnlyTokenMap();
+            for (InetAddress endPoint : leavingEndPoints)
+                allLeftMetadata.removeEndpoint(endPoint);
+            return allLeftMetadata;
         }
         finally
         {
             lock.readLock().unlock();
         }
-
-        return sb.toString();
     }
 
     public InetAddress getEndPoint(Token token)
@@ -211,12 +264,6 @@
         }
     }
 
-    public void clearUnsafe()
-    {
-        tokenToEndPointMap.clear();
-        pendingRanges.clear();
-    }
-
     public Range getPrimaryRangeFor(Token right)
     {
         return new Range(getPredecessor(right), right);
@@ -235,29 +282,16 @@
         }
     }
 
-    public void addPendingRange(Range range, InetAddress endpoint)
-    {
-        InetAddress oldEndpoint = pendingRanges.get(range);
-        if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
-            throw new RuntimeException("pending range collision between " + oldEndpoint +
" and " + endpoint);
-        pendingRanges.put(range, endpoint);
-    }
-
-    public void removePendingRange(Range range)
-    {
-        pendingRanges.remove(range);
-    }
-
     /** a mutable map may be returned but caller should not modify it */
-    public Map<Range, InetAddress> getPendingRanges()
+    public Map<Range, Collection<InetAddress>> getPendingRanges()
     {
-        return pendingRanges;
+        return pendingRanges.asMap();
     }
 
     public List<Range> getPendingRanges(InetAddress endpoint)
     {
         List<Range> ranges = new ArrayList<Range>();
-        for (Map.Entry<Range, InetAddress> entry : pendingRanges.entrySet())
+        for (Map.Entry<Range, InetAddress> entry : pendingRanges.entries())
         {
             if (entry.getValue().equals(endpoint))
             {
@@ -267,6 +301,11 @@
         return ranges;
     }
 
+    public void setPendingRanges(Multimap<Range, InetAddress> pendingRanges)
+    {
+        this.pendingRanges = pendingRanges;
+    }
+
     public Token getPredecessor(Token token)
     {
         List tokens = sortedTokens();
@@ -288,8 +327,96 @@
         return getEndPoint(getSuccessor(getToken(endPoint)));
     }
 
-    public void clearPendingRanges()
+    /** caller should not modify bootstrapTokens */
+    public Map<Token, InetAddress> getBootstrapTokens()
+    {
+        return bootstrapTokens;
+    }
+
+    /** caller should not modify leavigEndPoints */
+    public Set<InetAddress> getLeavingEndPoints()
     {
+        return leavingEndPoints;
+    }
+
+    /** used by tests */
+    public void clearUnsafe()
+    {
+        bootstrapTokens.clear();
+        tokenToEndPointMap.clear();
+        leavingEndPoints.clear();
         pendingRanges.clear();
     }
+
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        lock.readLock().lock();
+        try
+        {
+            Set<InetAddress> eps = tokenToEndPointMap.inverse().keySet();
+
+            if (!eps.isEmpty())
+            {
+                sb.append("Normal Tokens:");
+                sb.append(System.getProperty("line.separator"));
+                for (InetAddress ep : eps)
+                {
+                    sb.append(ep);
+                    sb.append(":");
+                    sb.append(tokenToEndPointMap.inverse().get(ep));
+                    sb.append(System.getProperty("line.separator"));
+                }
+            }
+
+            if (!bootstrapTokens.isEmpty())
+            {
+                sb.append("Bootstrapping Tokens:" );
+                sb.append(System.getProperty("line.separator"));
+                for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
+                {
+                    sb.append(entry.getValue() + ":" + entry.getKey());
+                    sb.append(System.getProperty("line.separator"));
+                }
+            }
+
+            if (!leavingEndPoints.isEmpty())
+            {
+                sb.append("Leaving EndPoints:");
+                sb.append(System.getProperty("line.separator"));
+                for (InetAddress ep : leavingEndPoints)
+                {
+                    sb.append(ep);
+                    sb.append(System.getProperty("line.separator"));
+                }
+            }
+
+            if (!pendingRanges.isEmpty())
+            {
+                sb.append("Pending Ranges:");
+                sb.append(System.getProperty("line.separator"));
+                sb.append(printPendingRanges());
+            }
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
+
+        return sb.toString();
+    }
+
+    public String printPendingRanges()
+    {
+        StringBuilder sb = new StringBuilder();
+
+        for (Map.Entry<Range, InetAddress> entry : pendingRanges.entries())
+        {
+            sb.append(entry.getValue() + ":" + entry.getKey());
+            sb.append(System.getProperty("line.separator"));
+        }
+
+        return sb.toString();
+    }
+
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed
Dec 16 21:28:55 2009
@@ -183,7 +183,7 @@
         if (logger_.isDebugEnabled())
             logger_.debug("Setting token to " + token);
         SystemTable.updateToken(token);
-        tokenMetadata_.update(token, FBUtilities.getLocalAddress());
+        tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress());
     }
 
     public StorageService()
@@ -306,7 +306,7 @@
         {
             SystemTable.setBootstrapped(true);
             Token token = storageMetadata_.getToken();
-            tokenMetadata_.update(token, FBUtilities.getLocalAddress());
+            tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress());
             Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL, new ApplicationState(partitioner_.getTokenFactory().toString(token)));
         }
 
@@ -407,23 +407,25 @@
             Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
             if (logger_.isDebugEnabled())
                 logger_.debug(endpoint + " state bootstrapping, token " + token);
-            updateBootstrapRanges(token, endpoint);
+            tokenMetadata_.addBootstrapToken(token, endpoint);
+            calculatePendingRanges();
         }
         else if (STATE_NORMAL.equals(stateName))
         {
             Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
             if (logger_.isDebugEnabled())
                 logger_.debug(endpoint + " state normal, token " + token);
-            tokenMetadata_.update(token, endpoint);
+            tokenMetadata_.updateNormalToken(token, endpoint);
+            calculatePendingRanges();
             if (!isClientMode)
                 SystemTable.updateToken(endpoint, token);
-            replicationStrategy_.removeObsoletePendingRanges();
         }
         else if (STATE_LEAVING.equals(stateName))
         {
             Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
             assert tokenMetadata_.getToken(endpoint).equals(token);
-            updateLeavingRanges(endpoint);
+            tokenMetadata_.addLeavingEndPoint(endpoint);
+            calculatePendingRanges();
         }
         else if (STATE_LEFT.equals(stateName))
         {
@@ -442,6 +444,7 @@
                         logger_.debug(endpoint + " state left, token " + token);
                     assert tokenMetadata_.getToken(endpoint).equals(token);
                     tokenMetadata_.removeEndpoint(endpoint);
+                    calculatePendingRanges();
                 }
             }
             else
@@ -454,11 +457,94 @@
                 {
                     restoreReplicaCount(endPointThatLeft);
                     tokenMetadata_.removeEndpoint(endPointThatLeft);
+                    calculatePendingRanges();
                 }
             }
+        }
+    }
+
+    /**
+     * Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is:
+     *
+     * (1) When in doubt, it is better to write too much to a node than too little. That
is, if
+     * there are multiple nodes moving, calculate the biggest ranges a node could have. Cleaning
+     * up unneeded data afterwards is better than missing writes during movement.
+     * (2) When a node leaves, ranges for other nodes can only grow (a node might get additional
+     * ranges, but it will not lose any of its current ranges as a result of a leave). Therefore
+     * we will first remove _all_ leaving tokens for the sake of calculation and then check
what
+     * ranges would go where if all nodes are to leave. This way we get the biggest possible
+     * ranges with regard current leave operations, covering all subsets of possible final
range
+     * values.
+     * (3) When a node bootstraps, ranges of other nodes can only get smaller. Without doing
+     * complex calculations to see if multiple bootstraps overlap, we simply base calculations
+     * on the same token ring used before (reflecting situation after all leave operations
have
+     * completed). Bootstrapping nodes will be added and removed one by one to that metadata
and
+     * checked what their ranges would be. This will give us the biggest possible ranges
the
+     * node could have. It might be that other bootstraps make our actual final ranges smaller,
+     * but it does not matter as we can clean up the data afterwards.
+     *
+     * NOTE: This is heavy and ineffective operation. This will be done only once when a
node
+     * changes state in the cluster, so it should be manageable.
+     */
+    private void calculatePendingRanges()
+    {
+        calculatePendingRanges(tokenMetadata_, replicationStrategy_);
+    }
+
+    // public & static for testing purposes
+    public static void calculatePendingRanges(TokenMetadata tm, AbstractReplicationStrategy
strategy)
+    {
+        Multimap<Range, InetAddress> pendingRanges = HashMultimap.create();
+        Map<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
+        Set<InetAddress> leavingEndPoints = tm.getLeavingEndPoints();
+
+        if (bootstrapTokens.isEmpty() && leavingEndPoints.isEmpty())
+        {
+            if (logger_.isDebugEnabled())
+                logger_.debug("No bootstrapping or leaving nodes -> empty pending ranges");
+            tm.setPendingRanges(pendingRanges);
+            return;
+        }
+
+        Multimap<InetAddress, Range> addressRanges = strategy.getAddressRanges();
+
+        // Copy of metadata reflecting the situation after all leave operations are finished.
+        TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft();
+
+        // get all ranges that will be affected by leaving nodes
+        Set<Range> affectedRanges = new HashSet<Range>();
+        for (InetAddress endPoint : leavingEndPoints)
+            affectedRanges.addAll(addressRanges.get(endPoint));
+
+        // for each of those ranges, find what new nodes will be responsible for the range
when
+        // all leaving nodes are gone.
+        for (Range range : affectedRanges)
+        {
+            List<InetAddress> currentEndPoints = strategy.getNaturalEndpoints(range.right(),
tm);
+            List<InetAddress> newEndPoints = strategy.getNaturalEndpoints(range.right(),
allLeftMetadata);
+            newEndPoints.removeAll(currentEndPoints);
+            pendingRanges.putAll(range, newEndPoints);
+        }
+
+        // At this stage pendingRanges has been updated according to leave operations. We
can
+        // now finish the calculation by checking bootstrapping nodes.
+
+        // For each of the bootstrapping nodes, simply add and remove them one by one to
+        // allLeftMetadata and check in between what their ranges would be.
+        for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
+        {
+            InetAddress endPoint = entry.getValue();
 
-            replicationStrategy_.removeObsoletePendingRanges();
+            allLeftMetadata.updateNormalToken(entry.getKey(), endPoint);
+            for (Range range : strategy.getAddressRanges(allLeftMetadata).get(endPoint))
+                pendingRanges.put(range, endPoint);
+            allLeftMetadata.removeEndpoint(endPoint);
         }
+
+        tm.setPendingRanges(pendingRanges);
+
+        if (logger_.isDebugEnabled())
+            logger_.debug("Pending ranges:\n" + tm.printPendingRanges());
     }
 
     /**
@@ -534,7 +620,7 @@
         Collection<Range> ranges = getRangesForEndPoint(endpoint);
 
         if (logger_.isDebugEnabled())
-            logger_.debug("leaving node ranges are [" + StringUtils.join(ranges, ", ") +
"]");
+            logger_.debug("Node " + endpoint + " ranges [" + StringUtils.join(ranges, ",
") + "]");
 
         Map<Range, ArrayList<InetAddress>> currentReplicaEndpoints = new HashMap<Range,
ArrayList<InetAddress>>();
 
@@ -542,7 +628,7 @@
         for (Range range : ranges)
             currentReplicaEndpoints.put(range, replicationStrategy_.getNaturalEndpoints(range.right(),
tokenMetadata_));
 
-        TokenMetadata temp = tokenMetadata_.cloneWithoutPending();
+        TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft();
         temp.removeEndpoint(endpoint);
 
         Multimap<Range, InetAddress> changedRanges = HashMultimap.create();
@@ -557,43 +643,13 @@
             ArrayList<InetAddress> newReplicaEndpoints = replicationStrategy_.getNaturalEndpoints(range.right(),
temp);
             newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
             if (logger_.isDebugEnabled())
-                logger_.debug("adding pending range " + range + " to endpoints " + StringUtils.join(newReplicaEndpoints,
", "));
+                logger_.debug("Range " + range + " will be responsibility of " + StringUtils.join(newReplicaEndpoints,
", "));
             changedRanges.putAll(range, newReplicaEndpoints);
         }
 
         return changedRanges;
     }
 
-    private void updateLeavingRanges(final InetAddress endpoint)
-    {
-        if (logger_.isDebugEnabled())
-            logger_.debug(endpoint + " is leaving; calculating pendingranges");
-        Multimap<Range, InetAddress> ranges = getChangedRangesForLeaving(endpoint);
-        for (Range range : ranges.keySet())
-        {
-            for (InetAddress newEndpoint : ranges.get(range))
-            {
-                tokenMetadata_.addPendingRange(range, newEndpoint);
-            }
-        }
-    }
-
-    private void updateBootstrapRanges(Token token, InetAddress endpoint)
-    {
-        for (Range range : replicationStrategy_.getPendingAddressRanges(tokenMetadata_, token,
endpoint))
-        {
-            tokenMetadata_.addPendingRange(range, endpoint);
-        }
-    }
-
-    public static void updateBootstrapRanges(AbstractReplicationStrategy strategy, TokenMetadata
metadata, Token token, InetAddress endpoint)
-    {
-        for (Range range : strategy.getPendingAddressRanges(metadata, token, endpoint))
-        {
-            metadata.addPendingRange(range, endpoint);
-        }
-    }
-
     public void onJoin(InetAddress endpoint, EndPointState epState)
     {
         for (Map.Entry<String,ApplicationState> entry : epState.getSortedApplicationStates())
@@ -1117,7 +1173,6 @@
     {
         SystemTable.setBootstrapped(false);
         tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress());
-        replicationStrategy_.removeObsoletePendingRanges();
 
         if (logger_.isDebugEnabled())
             logger_.debug("");
@@ -1238,7 +1293,6 @@
 
             restoreReplicaCount(endPoint);
             tokenMetadata_.removeEndpoint(endPoint);
-            replicationStrategy_.removeObsoletePendingRanges();
         }
 
         // This is not the cleanest way as we're adding STATE_LEFT for
@@ -1261,11 +1315,6 @@
         return replicationStrategy_;
     }
 
-    public void cancelPendingRanges()
-    {
-        tokenMetadata_.clearPendingRanges();
-    }
-
     public boolean isClientMode()
     {
         return isClientMode;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Wed Dec 16 21:28:55 2009
@@ -141,13 +141,6 @@
     public void loadBalance() throws IOException, InterruptedException;
 
     /**
-     * cancel writes to nodes that are set to be changing ranges.
-     * Only do this if the reason for the range changes no longer exists
-     * (e.g., a bootstrapping node was killed or crashed.)
-     */
-    public void cancelPendingRanges();
-
-    /**
      * removeToken removes token (and all data associated with
      * enpoint that had it) from the ring
      */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Dec 16
21:28:55 2009
@@ -398,11 +398,6 @@
         ssProxy.move(newToken);
     }
 
-    public void cancelPendingRanges()
-    {
-        ssProxy.cancelPendingRanges();
-    }
-
     public void removeToken(String token)
     {
         ssProxy.removeToken(token);
@@ -503,7 +498,7 @@
         HelpFormatter hf = new HelpFormatter();
         String header = String.format(
                 "%nAvailable commands: ring, info, cleanup, compact, cfstats, snapshot [name],
clearsnapshot, " +
-                "tpstats, flush, repair, decommission, move, loadbalance, cancelpending,
removetoken, " +
+                "tpstats, flush, repair, decommission, move, loadbalance, removetoken, "
+
                 " getcompactionthreshold, setcompactionthreshold [minthreshold] ([maxthreshold])");
         String usage = String.format("java %s -host <arg> <command>%n", NodeProbe.class.getName());
         hf.printHelp(usage, "", options, header);
@@ -578,10 +573,6 @@
             }
             probe.move(arguments[1]);
         }
-        else if (cmdName.equals("cancelpending"))
-        {
-            probe.cancelPendingRanges();
-        }
         else if (cmdName.equals("removetoken"))
         {
             if (arguments.length <= 1)

Propchange: incubator/cassandra/trunk/test/unit/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/test/unit/org:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/test/unit/org:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/test/unit/org:888872-890627
+/incubator/cassandra/branches/cassandra-0.5/test/unit/org:888872-891425
 /incubator/cassandra/trunk/test/unit/org:749219-768583

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Wed
Dec 16 21:28:55 2009
@@ -32,6 +32,7 @@
 import com.google.common.collect.Multimap;
 import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -61,7 +62,8 @@
         Range range3 = ss.getPrimaryRangeForEndPoint(three);
         Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left(),
range3.right());
         assert range3.contains(fakeToken);
-        StorageService.updateBootstrapRanges(StorageService.instance().getReplicationStrategy(),
tmd, fakeToken, myEndpoint);
+        ss.onChange(myEndpoint, StorageService.STATE_BOOTSTRAPPING, new ApplicationState(ss.getPartitioner().getTokenFactory().toString(fakeToken)));
+        tmd = ss.getTokenMetadata();
 
         InetAddress source2 = BootStrapper.getBootstrapSource(tmd, load);
         assert two.equals(source2) : source2;
@@ -124,7 +126,7 @@
         for (int i = 1; i <= numOldNodes; i++)
         {
             // leave .1 for myEndpoint
-            tmd.update(p.getRandomToken(), InetAddress.getByName("127.0.0." + (i + 1)));
+            tmd.updateNormalToken(p.getRandomToken(), InetAddress.getByName("127.0.0." +
(i + 1)));
         }
     }
 }
\ No newline at end of file

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
(original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
Wed Dec 16 21:28:55 2009
@@ -79,7 +79,7 @@
         for (int i = 0; i < endPointTokens.length; i++)
         {
             InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
-            tmd.update(endPointTokens[i], ep);
+            tmd.updateNormalToken(endPointTokens[i], ep);
             hosts.add(ep);
         }
 
@@ -114,15 +114,16 @@
         for (int i = 0; i < endPointTokens.length; i++)
         {
             InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
-            tmd.update(endPointTokens[i], ep);
+            tmd.updateNormalToken(endPointTokens[i], ep);
             hosts.add(ep);
         }
         
         //Add bootstrap node id=6
         Token bsToken = new BigIntegerToken(String.valueOf(25));
         InetAddress bootstrapEndPoint = InetAddress.getByName("127.0.0.6");
-        StorageService.updateBootstrapRanges(strategy, tmd, bsToken, bootstrapEndPoint);
-        
+        tmd.addBootstrapToken(bsToken, bootstrapEndPoint);
+        StorageService.calculatePendingRanges(tmd, strategy);
+
         for (int i = 0; i < keyTokens.length; i++)
         {
             Collection<InetAddress> endPoints = strategy.getWriteEndpoints(keyTokens[i],
strategy.getNaturalEndpoints(keyTokens[i]));
@@ -136,6 +137,8 @@
             // for 5, 15, 25 this should include bootstrap node
             if (i < 3)
                 assertTrue(endPoints.contains(bootstrapEndPoint));
+            else
+                assertFalse(endPoints.contains(bootstrapEndPoint));
         }
     }
 }



Mime
View raw message