Return-Path: Delivered-To: apmail-incubator-cassandra-commits-archive@minotaur.apache.org Received: (qmail 5660 invoked from network); 16 Dec 2009 18:13:59 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 16 Dec 2009 18:13:59 -0000 Received: (qmail 83462 invoked by uid 500); 16 Dec 2009 18:13:58 -0000 Delivered-To: apmail-incubator-cassandra-commits-archive@incubator.apache.org Received: (qmail 83439 invoked by uid 500); 16 Dec 2009 18:13:58 -0000 Mailing-List: contact cassandra-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cassandra-dev@incubator.apache.org Delivered-To: mailing list cassandra-commits@incubator.apache.org Received: (qmail 83429 invoked by uid 99); 16 Dec 2009 18:13:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Dec 2009 18:13:58 +0000 X-ASF-Spam-Status: No, hits=-2.6 required=5.0 tests=AWL,BAYES_00 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Dec 2009 18:13:55 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 718CD238888E; Wed, 16 Dec 2009 18:13:35 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r891356 - in /incubator/cassandra/branches/cassandra-0.5: ./ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/tools/ test/unit/org/apache/cassandra/dht/ test/unit/org/apache/cassand... Date: Wed, 16 Dec 2009 18:13:35 -0000 To: cassandra-commits@incubator.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091216181335.718CD238888E@eris.apache.org> Author: jbellis Date: Wed Dec 16 18:13:34 2009 New Revision: 891356 URL: http://svn.apache.org/viewvc?rev=891356&view=rev Log: Fix pending range conflicts when bootstapping or moving multiple nodes at once. patch by Jaakko Laine; reviewed by jbellis for CASSANDRA-603 Modified: incubator/cassandra/branches/cassandra-0.5/CHANGES.txt incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageServiceMBean.java incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java Modified: incubator/cassandra/branches/cassandra-0.5/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/CHANGES.txt?rev=891356&r1=891355&r2=891356&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.5/CHANGES.txt (original) +++ incubator/cassandra/branches/cassandra-0.5/CHANGES.txt Wed Dec 16 18:13:34 2009 @@ -4,6 +4,8 @@ * fix data streaming on windows (CASSANDRA-630) * GC compacted sstables after cleanup and compaction (CASSANDRA-621) * Speed up anti-entropy validation (CASSANDRA-629) + * Fix pending range conflicts when bootstapping or moving + multiple nodes at once (CASSANDRA-603) 0.5.0 beta 2 Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=891356&r1=891355&r2=891356&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original) +++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Wed Dec 16 18:13:34 2009 @@ -91,11 +91,11 @@ List endpoints = new ArrayList(naturalEndpoints); - for (Map.Entry entry : tokenMetadata_.getPendingRanges().entrySet()) + for (Map.Entry> entry : tokenMetadata_.getPendingRanges().entrySet()) { if (entry.getKey().contains(token)) { - endpoints.add(entry.getValue()); + endpoints.addAll(entry.getValue()); } } @@ -202,26 +202,9 @@ public Collection getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress) { - TokenMetadata temp = metadata.cloneWithoutPending(); - temp.update(pendingToken, pendingAddress); + TokenMetadata temp = metadata.cloneOnlyTokenMap(); + temp.updateNormalToken(pendingToken, pendingAddress); return getAddressRanges(temp).get(pendingAddress); } - public void removeObsoletePendingRanges() - { - Multimap ranges = getAddressRanges(); - for (Map.Entry entry : tokenMetadata_.getPendingRanges().entrySet()) - { - for (Range currentRange : ranges.get(entry.getValue())) - { - if (currentRange.contains(entry.getKey())) - { - if (logger_.isDebugEnabled()) - logger_.debug("Removing obsolete pending range " + entry.getKey() + " from " + entry.getValue()); - tokenMetadata_.removePendingRange(entry.getKey()); - break; - } - } - } - } } Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=891356&r1=891355&r2=891356&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java (original) +++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java Wed Dec 16 18:13:34 2009 @@ -29,15 +29,33 @@ import org.apache.commons.lang.StringUtils; -import org.cliffc.high_scale_lib.NonBlockingHashMap; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; +import com.google.common.collect.Multimap; +import com.google.common.collect.HashMultimap; public class TokenMetadata { /* Maintains token to endpoint map of every node in the cluster. */ private BiMap tokenToEndPointMap; - private Map pendingRanges; + + // Suppose that there is a ring of nodes A, C and E, with replication factor 3. + // Node D bootstraps between C and E, so its pending ranges will be E-A, A-C and C-D. + // Now suppose node B bootstraps between A and C at the same time. Its pending ranges would be C-E, E-A and A-B. + // Now both nodes have pending range E-A in their list, which will cause pending range collision + // even though we're only talking about replica range, not even primary range. The same thing happens + // for any nodes that boot simultaneously between same two nodes. For this we cannot simply make pending ranges a multimap, + // since that would make us unable to notice the real problem of two nodes trying to boot using the same token. + // In order to do this properly, we need to know what tokens are booting at any time. + private Map bootstrapTokens; + + // we will need to know at all times what nodes are leaving and calculate ranges accordingly. + // An anonymous pending ranges list is not enough, as that does not tell which node is leaving + // and/or if the ranges are there because of bootstrap or leave operation. + // (See CASSANDRA-603 for more detail + examples). + private Set leavingEndPoints; + + private Multimap pendingRanges; /* Use this lock for manipulating the token map */ private final ReadWriteLock lock = new ReentrantReadWriteLock(true); @@ -53,7 +71,9 @@ if (tokenToEndPointMap == null) tokenToEndPointMap = HashBiMap.create(); this.tokenToEndPointMap = tokenToEndPointMap; - pendingRanges = new NonBlockingHashMap(); + bootstrapTokens = new HashMap(); + leavingEndPoints = new HashSet(); + pendingRanges = HashMultimap.create(); sortedTokens = sortTokens(); } @@ -69,18 +89,13 @@ { int n = 0; Range sourceRange = getPrimaryRangeFor(getToken(source)); - for (Map.Entry entry : pendingRanges.entrySet()) - { - if (sourceRange.contains(entry.getKey()) || entry.getValue().equals(source)) + for (Token token : bootstrapTokens.keySet()) + if (sourceRange.contains(token)) n++; - } return n; } - /** - * Update the two maps in an safe mode. - */ - public void update(Token token, InetAddress endpoint) + public void updateNormalToken(Token token, InetAddress endpoint) { assert token != null; assert endpoint != null; @@ -88,6 +103,8 @@ lock.writeLock().lock(); try { + bootstrapTokens.remove(token); + tokenToEndPointMap.inverse().remove(endpoint); if (!endpoint.equals(tokenToEndPointMap.put(token, endpoint))) { @@ -100,13 +117,49 @@ } } + public void addBootstrapToken(Token token, InetAddress endpoint) + { + assert token != null; + assert endpoint != null; + + lock.writeLock().lock(); + try + { + InetAddress oldEndPoint = bootstrapTokens.get(token); + if (oldEndPoint != null && !oldEndPoint.equals(endpoint)) + throw new RuntimeException("Bootstrap Token collision between " + oldEndPoint + " and " + endpoint + " (token " + token); + bootstrapTokens.put(token, endpoint); + } + finally + { + lock.writeLock().unlock(); + } + } + + public void addLeavingEndPoint(InetAddress endpoint) + { + assert endpoint != null; + + lock.writeLock().lock(); + try + { + leavingEndPoints.add(endpoint); + } + finally + { + lock.writeLock().unlock(); + } + } + public void removeEndpoint(InetAddress endpoint) { assert tokenToEndPointMap.containsValue(endpoint); lock.writeLock().lock(); try { + bootstrapTokens.remove(getToken(endpoint)); tokenToEndPointMap.inverse().remove(endpoint); + leavingEndPoints.remove(endpoint); sortedTokens = sortTokens(); } finally @@ -161,7 +214,11 @@ } } - public TokenMetadata cloneWithoutPending() + /** + * Create a copy of TokenMetadata with only tokenToEndPointMap. That is, pending ranges, + * bootstrap tokens and leaving endpoints are not included in the copy. + */ + public TokenMetadata cloneOnlyTokenMap() { lock.readLock().lock(); try @@ -174,28 +231,24 @@ } } - public String toString() + /** + * Create a copy of TokenMetadata with tokenToEndPointMap reflecting situation after all + * current leave operations have finished. + */ + public TokenMetadata cloneAfterAllLeft() { - StringBuilder sb = new StringBuilder(); lock.readLock().lock(); try { - Set eps = tokenToEndPointMap.inverse().keySet(); - - for (InetAddress ep : eps) - { - sb.append(ep); - sb.append(":"); - sb.append(tokenToEndPointMap.inverse().get(ep)); - sb.append(System.getProperty("line.separator")); - } + TokenMetadata allLeftMetadata = cloneOnlyTokenMap(); + for (InetAddress endPoint : leavingEndPoints) + allLeftMetadata.removeEndpoint(endPoint); + return allLeftMetadata; } finally { lock.readLock().unlock(); } - - return sb.toString(); } public InetAddress getEndPoint(Token token) @@ -211,12 +264,6 @@ } } - public void clearUnsafe() - { - tokenToEndPointMap.clear(); - pendingRanges.clear(); - } - public Range getPrimaryRangeFor(Token right) { return new Range(getPredecessor(right), right); @@ -235,29 +282,16 @@ } } - public void addPendingRange(Range range, InetAddress endpoint) - { - InetAddress oldEndpoint = pendingRanges.get(range); - if (oldEndpoint != null && !oldEndpoint.equals(endpoint)) - throw new RuntimeException("pending range collision between " + oldEndpoint + " and " + endpoint); - pendingRanges.put(range, endpoint); - } - - public void removePendingRange(Range range) - { - pendingRanges.remove(range); - } - /** a mutable map may be returned but caller should not modify it */ - public Map getPendingRanges() + public Map> getPendingRanges() { - return pendingRanges; + return pendingRanges.asMap(); } public List getPendingRanges(InetAddress endpoint) { List ranges = new ArrayList(); - for (Map.Entry entry : pendingRanges.entrySet()) + for (Map.Entry entry : pendingRanges.entries()) { if (entry.getValue().equals(endpoint)) { @@ -267,6 +301,11 @@ return ranges; } + public void setPendingRanges(Multimap pendingRanges) + { + this.pendingRanges = pendingRanges; + } + public Token getPredecessor(Token token) { List tokens = sortedTokens(); @@ -288,8 +327,96 @@ return getEndPoint(getSuccessor(getToken(endPoint))); } - public void clearPendingRanges() + /** caller should not modify bootstrapTokens */ + public Map getBootstrapTokens() + { + return bootstrapTokens; + } + + /** caller should not modify leavigEndPoints */ + public Set getLeavingEndPoints() { + return leavingEndPoints; + } + + /** used by tests */ + public void clearUnsafe() + { + bootstrapTokens.clear(); + tokenToEndPointMap.clear(); + leavingEndPoints.clear(); pendingRanges.clear(); } + + public String toString() + { + StringBuilder sb = new StringBuilder(); + lock.readLock().lock(); + try + { + Set eps = tokenToEndPointMap.inverse().keySet(); + + if (!eps.isEmpty()) + { + sb.append("Normal Tokens:"); + sb.append(System.getProperty("line.separator")); + for (InetAddress ep : eps) + { + sb.append(ep); + sb.append(":"); + sb.append(tokenToEndPointMap.inverse().get(ep)); + sb.append(System.getProperty("line.separator")); + } + } + + if (!bootstrapTokens.isEmpty()) + { + sb.append("Bootstrapping Tokens:" ); + sb.append(System.getProperty("line.separator")); + for (Map.Entry entry : bootstrapTokens.entrySet()) + { + sb.append(entry.getValue() + ":" + entry.getKey()); + sb.append(System.getProperty("line.separator")); + } + } + + if (!leavingEndPoints.isEmpty()) + { + sb.append("Leaving EndPoints:"); + sb.append(System.getProperty("line.separator")); + for (InetAddress ep : leavingEndPoints) + { + sb.append(ep); + sb.append(System.getProperty("line.separator")); + } + } + + if (!pendingRanges.isEmpty()) + { + sb.append("Pending Ranges:"); + sb.append(System.getProperty("line.separator")); + sb.append(printPendingRanges()); + } + } + finally + { + lock.readLock().unlock(); + } + + return sb.toString(); + } + + public String printPendingRanges() + { + StringBuilder sb = new StringBuilder(); + + for (Map.Entry entry : pendingRanges.entries()) + { + sb.append(entry.getValue() + ":" + entry.getKey()); + sb.append(System.getProperty("line.separator")); + } + + return sb.toString(); + } + } Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java?rev=891356&r1=891355&r2=891356&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java (original) +++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java Wed Dec 16 18:13:34 2009 @@ -193,7 +193,7 @@ if (logger_.isDebugEnabled()) logger_.debug("Setting token to " + token); SystemTable.updateToken(token); - tokenMetadata_.update(token, FBUtilities.getLocalAddress()); + tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress()); } public StorageService() @@ -316,7 +316,7 @@ { SystemTable.setBootstrapped(true); Token token = storageMetadata_.getToken(); - tokenMetadata_.update(token, FBUtilities.getLocalAddress()); + tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress()); Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL, new ApplicationState(partitioner_.getTokenFactory().toString(token))); } @@ -417,23 +417,25 @@ Token token = getPartitioner().getTokenFactory().fromString(state.getValue()); if (logger_.isDebugEnabled()) logger_.debug(endpoint + " state bootstrapping, token " + token); - updateBootstrapRanges(token, endpoint); + tokenMetadata_.addBootstrapToken(token, endpoint); + calculatePendingRanges(); } else if (STATE_NORMAL.equals(stateName)) { Token token = getPartitioner().getTokenFactory().fromString(state.getValue()); if (logger_.isDebugEnabled()) logger_.debug(endpoint + " state normal, token " + token); - tokenMetadata_.update(token, endpoint); + tokenMetadata_.updateNormalToken(token, endpoint); + calculatePendingRanges(); if (!isClientMode) SystemTable.updateToken(endpoint, token); - replicationStrategy_.removeObsoletePendingRanges(); } else if (STATE_LEAVING.equals(stateName)) { Token token = getPartitioner().getTokenFactory().fromString(state.getValue()); assert tokenMetadata_.getToken(endpoint).equals(token); - updateLeavingRanges(endpoint); + tokenMetadata_.addLeavingEndPoint(endpoint); + calculatePendingRanges(); } else if (STATE_LEFT.equals(stateName)) { @@ -452,6 +454,7 @@ logger_.debug(endpoint + " state left, token " + token); assert tokenMetadata_.getToken(endpoint).equals(token); tokenMetadata_.removeEndpoint(endpoint); + calculatePendingRanges(); } } else @@ -464,11 +467,94 @@ { restoreReplicaCount(endPointThatLeft); tokenMetadata_.removeEndpoint(endPointThatLeft); + calculatePendingRanges(); } } + } + } + + /** + * Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is: + * + * (1) When in doubt, it is better to write too much to a node than too little. That is, if + * there are multiple nodes moving, calculate the biggest ranges a node could have. Cleaning + * up unneeded data afterwards is better than missing writes during movement. + * (2) When a node leaves, ranges for other nodes can only grow (a node might get additional + * ranges, but it will not lose any of its current ranges as a result of a leave). Therefore + * we will first remove _all_ leaving tokens for the sake of calculation and then check what + * ranges would go where if all nodes are to leave. This way we get the biggest possible + * ranges with regard current leave operations, covering all subsets of possible final range + * values. + * (3) When a node bootstraps, ranges of other nodes can only get smaller. Without doing + * complex calculations to see if multiple bootstraps overlap, we simply base calculations + * on the same token ring used before (reflecting situation after all leave operations have + * completed). Bootstrapping nodes will be added and removed one by one to that metadata and + * checked what their ranges would be. This will give us the biggest possible ranges the + * node could have. It might be that other bootstraps make our actual final ranges smaller, + * but it does not matter as we can clean up the data afterwards. + * + * NOTE: This is heavy and ineffective operation. This will be done only once when a node + * changes state in the cluster, so it should be manageable. + */ + private void calculatePendingRanges() + { + calculatePendingRanges(tokenMetadata_, replicationStrategy_); + } + + // public & static for testing purposes + public static void calculatePendingRanges(TokenMetadata tm, AbstractReplicationStrategy strategy) + { + Multimap pendingRanges = HashMultimap.create(); + Map bootstrapTokens = tm.getBootstrapTokens(); + Set leavingEndPoints = tm.getLeavingEndPoints(); + + if (bootstrapTokens.isEmpty() && leavingEndPoints.isEmpty()) + { + if (logger_.isDebugEnabled()) + logger_.debug("No bootstrapping or leaving nodes -> empty pending ranges"); + tm.setPendingRanges(pendingRanges); + return; + } + + Multimap addressRanges = strategy.getAddressRanges(); + + // Copy of metadata reflecting the situation after all leave operations are finished. + TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft(); + + // get all ranges that will be affected by leaving nodes + Set affectedRanges = new HashSet(); + for (InetAddress endPoint : leavingEndPoints) + affectedRanges.addAll(addressRanges.get(endPoint)); + + // for each of those ranges, find what new nodes will be responsible for the range when + // all leaving nodes are gone. + for (Range range : affectedRanges) + { + List currentEndPoints = strategy.getNaturalEndpoints(range.right(), tm); + List newEndPoints = strategy.getNaturalEndpoints(range.right(), allLeftMetadata); + newEndPoints.removeAll(currentEndPoints); + pendingRanges.putAll(range, newEndPoints); + } + + // At this stage pendingRanges has been updated according to leave operations. We can + // now finish the calculation by checking bootstrapping nodes. + + // For each of the bootstrapping nodes, simply add and remove them one by one to + // allLeftMetadata and check in between what their ranges would be. + for (Map.Entry entry : bootstrapTokens.entrySet()) + { + InetAddress endPoint = entry.getValue(); - replicationStrategy_.removeObsoletePendingRanges(); + allLeftMetadata.updateNormalToken(entry.getKey(), endPoint); + for (Range range : strategy.getAddressRanges(allLeftMetadata).get(endPoint)) + pendingRanges.put(range, endPoint); + allLeftMetadata.removeEndpoint(endPoint); } + + tm.setPendingRanges(pendingRanges); + + if (logger_.isDebugEnabled()) + logger_.debug("Pending ranges:\n" + tm.printPendingRanges()); } /** @@ -544,7 +630,7 @@ Collection ranges = getRangesForEndPoint(endpoint); if (logger_.isDebugEnabled()) - logger_.debug("leaving node ranges are [" + StringUtils.join(ranges, ", ") + "]"); + logger_.debug("Node " + endpoint + " ranges [" + StringUtils.join(ranges, ", ") + "]"); Map> currentReplicaEndpoints = new HashMap>(); @@ -552,7 +638,7 @@ for (Range range : ranges) currentReplicaEndpoints.put(range, replicationStrategy_.getNaturalEndpoints(range.right(), tokenMetadata_)); - TokenMetadata temp = tokenMetadata_.cloneWithoutPending(); + TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft(); temp.removeEndpoint(endpoint); Multimap changedRanges = HashMultimap.create(); @@ -567,43 +653,13 @@ ArrayList newReplicaEndpoints = replicationStrategy_.getNaturalEndpoints(range.right(), temp); newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range)); if (logger_.isDebugEnabled()) - logger_.debug("adding pending range " + range + " to endpoints " + StringUtils.join(newReplicaEndpoints, ", ")); + logger_.debug("Range " + range + " will be responsibility of " + StringUtils.join(newReplicaEndpoints, ", ")); changedRanges.putAll(range, newReplicaEndpoints); } return changedRanges; } - private void updateLeavingRanges(final InetAddress endpoint) - { - if (logger_.isDebugEnabled()) - logger_.debug(endpoint + " is leaving; calculating pendingranges"); - Multimap ranges = getChangedRangesForLeaving(endpoint); - for (Range range : ranges.keySet()) - { - for (InetAddress newEndpoint : ranges.get(range)) - { - tokenMetadata_.addPendingRange(range, newEndpoint); - } - } - } - - private void updateBootstrapRanges(Token token, InetAddress endpoint) - { - for (Range range : replicationStrategy_.getPendingAddressRanges(tokenMetadata_, token, endpoint)) - { - tokenMetadata_.addPendingRange(range, endpoint); - } - } - - public static void updateBootstrapRanges(AbstractReplicationStrategy strategy, TokenMetadata metadata, Token token, InetAddress endpoint) - { - for (Range range : strategy.getPendingAddressRanges(metadata, token, endpoint)) - { - metadata.addPendingRange(range, endpoint); - } - } - public void onJoin(InetAddress endpoint, EndPointState epState) { for (Map.Entry entry : epState.getSortedApplicationStates()) @@ -1127,7 +1183,6 @@ { SystemTable.setBootstrapped(false); tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress()); - replicationStrategy_.removeObsoletePendingRanges(); if (logger_.isDebugEnabled()) logger_.debug(""); @@ -1248,7 +1303,6 @@ restoreReplicaCount(endPoint); tokenMetadata_.removeEndpoint(endPoint); - replicationStrategy_.removeObsoletePendingRanges(); } // This is not the cleanest way as we're adding STATE_LEFT for @@ -1271,11 +1325,6 @@ return replicationStrategy_; } - public void cancelPendingRanges() - { - tokenMetadata_.clearPendingRanges(); - } - public boolean isClientMode() { return isClientMode; Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageServiceMBean.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=891356&r1=891355&r2=891356&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original) +++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Dec 16 18:13:34 2009 @@ -141,13 +141,6 @@ public void loadBalance() throws IOException, InterruptedException; /** - * cancel writes to nodes that are set to be changing ranges. - * Only do this if the reason for the range changes no longer exists - * (e.g., a bootstrapping node was killed or crashed.) - */ - public void cancelPendingRanges(); - - /** * removeToken removes token (and all data associated with * enpoint that had it) from the ring */ Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=891356&r1=891355&r2=891356&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java (original) +++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Dec 16 18:13:34 2009 @@ -398,11 +398,6 @@ ssProxy.move(newToken); } - public void cancelPendingRanges() - { - ssProxy.cancelPendingRanges(); - } - public void removeToken(String token) { ssProxy.removeToken(token); @@ -503,7 +498,7 @@ HelpFormatter hf = new HelpFormatter(); String header = String.format( "%nAvailable commands: ring, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, " + - "tpstats, flush, repair, decommission, move, loadbalance, cancelpending, removetoken, " + + "tpstats, flush, repair, decommission, move, loadbalance, removetoken, " + " getcompactionthreshold, setcompactionthreshold [minthreshold] ([maxthreshold])"); String usage = String.format("java %s -host %n", NodeProbe.class.getName()); hf.printHelp(usage, "", options, header); @@ -578,10 +573,6 @@ } probe.move(arguments[1]); } - else if (cmdName.equals("cancelpending")) - { - probe.cancelPendingRanges(); - } else if (cmdName.equals("removetoken")) { if (arguments.length <= 1) Modified: incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=891356&r1=891355&r2=891356&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original) +++ incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Wed Dec 16 18:13:34 2009 @@ -32,6 +32,7 @@ import com.google.common.collect.Multimap; import org.apache.cassandra.gms.IFailureDetectionEventListener; import org.apache.cassandra.gms.IFailureDetector; +import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; @@ -61,7 +62,8 @@ Range range3 = ss.getPrimaryRangeForEndPoint(three); Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left(), range3.right()); assert range3.contains(fakeToken); - StorageService.updateBootstrapRanges(StorageService.instance().getReplicationStrategy(), tmd, fakeToken, myEndpoint); + ss.onChange(myEndpoint, StorageService.STATE_BOOTSTRAPPING, new ApplicationState(ss.getPartitioner().getTokenFactory().toString(fakeToken))); + tmd = ss.getTokenMetadata(); InetAddress source2 = BootStrapper.getBootstrapSource(tmd, load); assert two.equals(source2) : source2; @@ -124,7 +126,7 @@ for (int i = 1; i <= numOldNodes; i++) { // leave .1 for myEndpoint - tmd.update(p.getRandomToken(), InetAddress.getByName("127.0.0." + (i + 1))); + tmd.updateNormalToken(p.getRandomToken(), InetAddress.getByName("127.0.0." + (i + 1))); } } } \ No newline at end of file Modified: incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=891356&r1=891355&r2=891356&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java (original) +++ incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java Wed Dec 16 18:13:34 2009 @@ -79,7 +79,7 @@ for (int i = 0; i < endPointTokens.length; i++) { InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1)); - tmd.update(endPointTokens[i], ep); + tmd.updateNormalToken(endPointTokens[i], ep); hosts.add(ep); } @@ -114,15 +114,16 @@ for (int i = 0; i < endPointTokens.length; i++) { InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1)); - tmd.update(endPointTokens[i], ep); + tmd.updateNormalToken(endPointTokens[i], ep); hosts.add(ep); } //Add bootstrap node id=6 Token bsToken = new BigIntegerToken(String.valueOf(25)); InetAddress bootstrapEndPoint = InetAddress.getByName("127.0.0.6"); - StorageService.updateBootstrapRanges(strategy, tmd, bsToken, bootstrapEndPoint); - + tmd.addBootstrapToken(bsToken, bootstrapEndPoint); + StorageService.calculatePendingRanges(tmd, strategy); + for (int i = 0; i < keyTokens.length; i++) { Collection endPoints = strategy.getWriteEndpoints(keyTokens[i], strategy.getNaturalEndpoints(keyTokens[i])); @@ -136,6 +137,8 @@ // for 5, 15, 25 this should include bootstrap node if (i < 3) assertTrue(endPoints.contains(bootstrapEndPoint)); + else + assertFalse(endPoints.contains(bootstrapEndPoint)); } } }