cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [03/15] git commit: Remove shuffle/taketoken
Date Fri, 01 Aug 2014 18:57:11 GMT
Remove shuffle/taketoken

Patch by brandonwilliams, reviewed by thobbs for CASSANDRA-7601


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ee2ed3c8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ee2ed3c8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ee2ed3c8

Branch: refs/heads/cassandra-2.1
Commit: ee2ed3c811e344ca9171a96bcfb1144e43b69878
Parents: eb92a9f
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Fri Aug 1 13:56:09 2014 -0500
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Fri Aug 1 13:56:09 2014 -0500

----------------------------------------------------------------------
 NEWS.txt                                        |   9 +
 bin/cassandra-shuffle                           |  57 --
 debian/cassandra.install                        |   1 -
 .../apache/cassandra/gms/VersionedValue.java    |   7 -
 .../apache/cassandra/locator/TokenMetadata.java | 102 +--
 .../service/PendingRangeCalculatorService.java  |  20 +-
 .../ScheduledRangeTransferExecutorService.java  | 138 ----
 .../cassandra/service/StorageService.java       | 156 ----
 .../cassandra/service/StorageServiceMBean.java  |  10 -
 .../org/apache/cassandra/tools/Shuffle.java     | 767 -------------------
 .../apache/cassandra/service/RelocateTest.java  | 210 -----
 11 files changed, 13 insertions(+), 1464 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee2ed3c8/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index d2d1fa5..1a9d6d2 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -13,6 +13,15 @@ restore snapshots created with the previous major version using the
 'sstableloader' tool. You can upgrade the file format of your snapshots
 using the provided 'sstableupgrade' tool.
 
+1.2.19
+======
+
+Upgrading
+---------
+    - Shuffle and taketoken have been removed.  For clusters that choose to
+      upgrade to vnodes, creating a new datacenter with vnodes and migrating is
+      recommended. See http://goo.gl/Sna2S1 for further information.
+
 1.2.18
 ======
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee2ed3c8/bin/cassandra-shuffle
----------------------------------------------------------------------
diff --git a/bin/cassandra-shuffle b/bin/cassandra-shuffle
deleted file mode 100755
index 53636f7..0000000
--- a/bin/cassandra-shuffle
+++ /dev/null
@@ -1,57 +0,0 @@
-#!/bin/sh
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
-    for include in /usr/share/cassandra/cassandra.in.sh \
-                   /usr/local/share/cassandra/cassandra.in.sh \
-                   /opt/cassandra/cassandra.in.sh \
-                   "`dirname "$0"`/cassandra.in.sh"; do
-        if [ -r "$include" ]; then
-            . "$include"
-            break
-        fi
-    done
-elif [ -r "$CASSANDRA_INCLUDE" ]; then
-    . "$CASSANDRA_INCLUDE"
-fi
-
-# Use JAVA_HOME if set, otherwise look for java in PATH
-if [ -x "$JAVA_HOME/bin/java" ]; then
-    JAVA="$JAVA_HOME/bin/java"
-else
-    JAVA="`which java`"
-fi
-
-if [ -z "$CASSANDRA_CONF" -o -z "$CLASSPATH" ]; then
-    echo "You must set the CASSANDRA_CONF and CLASSPATH vars" >&2
-    exit 1
-fi
-
-# Special-case path variables.
-case "`uname`" in
-    CYGWIN*) 
-        CLASSPATH="`cygpath -p -w "$CLASSPATH"`"
-        CASSANDRA_CONF="`cygpath -p -w "$CASSANDRA_CONF"`"
-    ;;
-esac
-
-"$JAVA" -cp "$CLASSPATH" \
-      -Xmx32m \
-      -Dlog4j.configuration=log4j-tools.properties \
-      org.apache.cassandra.tools.Shuffle $@
-
-# vi:ai sw=4 ts=4 tw=0 et

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee2ed3c8/debian/cassandra.install
----------------------------------------------------------------------
diff --git a/debian/cassandra.install b/debian/cassandra.install
index 70d4b97..7af1bee 100644
--- a/debian/cassandra.install
+++ b/debian/cassandra.install
@@ -19,7 +19,6 @@ bin/cqlsh usr/bin
 bin/sstablescrub usr/bin
 bin/sstableupgrade usr/bin
 bin/sstablesplit usr/bin
-bin/cassandra-shuffle usr/bin
 tools/bin/cassandra-stress usr/bin
 tools/bin/token-generator usr/bin
 lib/*.jar usr/share/cassandra/lib

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee2ed3c8/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 60459c8..73220f3 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -62,7 +62,6 @@ public class VersionedValue implements Comparable<VersionedValue>
     public final static String STATUS_LEAVING = "LEAVING";
     public final static String STATUS_LEFT = "LEFT";
     public final static String STATUS_MOVING = "MOVING";
-    public final static String STATUS_RELOCATING = "RELOCATING";
 
     public final static String REMOVING_TOKEN = "removing";
     public final static String REMOVED_TOKEN = "removed";
@@ -157,12 +156,6 @@ public class VersionedValue implements Comparable<VersionedValue>
             return new VersionedValue(VersionedValue.STATUS_MOVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
         }
 
-        public VersionedValue relocating(Collection<Token> srcTokens)
-        {
-            return new VersionedValue(
-                    versionString(VersionedValue.STATUS_RELOCATING, StringUtils.join(srcTokens, VersionedValue.DELIMITER)));
-        }
-
         public VersionedValue hostId(UUID hostId)
         {
             return new VersionedValue(hostId.toString());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee2ed3c8/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 22a9042..f412335 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -87,9 +87,6 @@ public class TokenMetadata
     // nodes which are migrating to the new tokens in the ring
     private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<Pair<Token, InetAddress>>();
 
-    // tokens which are migrating to new endpoints
-    private final ConcurrentMap<Token, InetAddress> relocatingTokens = new ConcurrentHashMap<Token, InetAddress>();
-
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
     private volatile ArrayList<Token> sortedTokens;
@@ -388,33 +385,6 @@ public class TokenMetadata
         }
     }
 
-    /**
-     * Add new relocating ranges (tokens moving from their respective endpoints, to another).
-     * @param tokens tokens being moved
-     * @param endpoint destination of moves
-     */
-    public void addRelocatingTokens(Collection<Token> tokens, InetAddress endpoint)
-    {
-        assert endpoint != null;
-        assert tokens != null && tokens.size() > 0;
-
-        lock.writeLock().lock();
-
-        try
-        {
-            for (Token token : tokens)
-            {
-                InetAddress prev = relocatingTokens.put(token, endpoint);
-                if (prev != null && !prev.equals(endpoint))
-                    logger.warn("Relocation of {} to {} overwrites previous to {}", new Object[]{token, endpoint, prev});
-            }
-        }
-        finally
-        {
-            lock.writeLock().unlock();
-        }
-    }
-
     public void removeEndpoint(InetAddress endpoint)
     {
         assert endpoint != null;
@@ -464,38 +434,6 @@ public class TokenMetadata
         }
     }
 
-    /**
-     * Remove pair of token/address from relocating ranges.
-     * @param endpoint
-     */
-    public void removeFromRelocating(Token token, InetAddress endpoint)
-    {
-        assert endpoint != null;
-        assert token != null;
-
-        lock.writeLock().lock();
-
-        try
-        {
-            InetAddress previous = relocatingTokens.remove(token);
-
-            if (previous == null)
-            {
-                logger.debug("Cannot remove {}, not found among the relocating (previously removed?)", token);
-            }
-            else if (!previous.equals(endpoint))
-            {
-                logger.warn(
-                        "Removal of relocating token {} with mismatched endpoint ({} != {})",
-                        new Object[]{token, endpoint, previous});
-            }
-        }
-        finally
-        {
-            lock.writeLock().unlock();
-        }
-    }
-
     public Collection<Token> getTokens(InetAddress endpoint)
     {
         assert endpoint != null;
@@ -570,22 +508,6 @@ public class TokenMetadata
         }
     }
 
-    public boolean isRelocating(Token token)
-    {
-        assert token != null;
-
-        lock.readLock().lock();
-
-        try
-        {
-            return relocatingTokens.containsKey(token);
-        }
-        finally
-        {
-            lock.readLock().unlock();
-        }
-    }
-
     private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<TokenMetadata>();
 
     /**
@@ -658,7 +580,7 @@ public class TokenMetadata
 
     /**
      * Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
-     * current leave, move, and relocate operations have finished.
+     * current leave, and move operations have finished.
      *
      * @return new token metadata
      */
@@ -677,9 +599,6 @@ public class TokenMetadata
             for (Pair<Token, InetAddress> pair : movingEndpoints)
                 metadata.updateNormalToken(pair.left, pair.right);
 
-            for (Map.Entry<Token, InetAddress> relocating: relocatingTokens.entrySet())
-                metadata.updateNormalToken(relocating.getKey(), relocating.getValue());
-
             return metadata;
         }
         finally
@@ -831,15 +750,6 @@ public class TokenMetadata
         }
     }
 
-    /**
-     * Ranges which are migrating to new endpoints.
-     * @return set of token-address pairs of relocating ranges
-     */
-    public Map<Token, InetAddress> getRelocatingRanges()
-    {
-        return relocatingTokens;
-    }
-
     public static int firstTokenIndex(final ArrayList ring, Token start, boolean insertMin)
     {
         assert ring.size() > 0;
@@ -994,16 +904,6 @@ public class TokenMetadata
         return sb.toString();
     }
 
-    public String printRelocatingRanges()
-    {
-        StringBuilder sb = new StringBuilder();
-
-        for (Map.Entry<Token, InetAddress> entry : relocatingTokens.entrySet())
-            sb.append(String.format("%s:%s%n", entry.getKey(), entry.getValue()));
-
-        return sb.toString();
-    }
-
     public Collection<InetAddress> pendingEndpointsFor(Token token, String table)
     {
         Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(table);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee2ed3c8/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 6f77ace..1c0d977 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -124,10 +124,10 @@ public class PendingRangeCalculatorService extends PendingRangeCalculatorService
         BiMultiValMap<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
         Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints();
 
-        if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty() && tm.getRelocatingRanges().isEmpty())
+        if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty())
         {
             if (logger.isDebugEnabled())
-                logger.debug("No bootstrapping, leaving or moving nodes, and no relocating tokens -> empty pending ranges for {}", table);
+                logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", table);
             tm.setPendingRanges(table, pendingRanges);
             return;
         }
@@ -168,7 +168,7 @@ public class PendingRangeCalculatorService extends PendingRangeCalculatorService
         }
 
         // At this stage pendingRanges has been updated according to leaving and bootstrapping nodes.
-        // We can now finish the calculation by checking moving and relocating nodes.
+        // We can now finish the calculation by checking moving nodes.
 
         // For each of the moving nodes, we do the same thing we did for bootstrapping:
         // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
@@ -187,20 +187,6 @@ public class PendingRangeCalculatorService extends PendingRangeCalculatorService
             allLeftMetadata.removeEndpoint(endpoint);
         }
 
-        // Ranges being relocated.
-        for (Map.Entry<Token, InetAddress> relocating : tm.getRelocatingRanges().entrySet())
-        {
-            InetAddress endpoint = relocating.getValue(); // address of the moving node
-            Token token = relocating.getKey();
-
-            allLeftMetadata.updateNormalToken(token, endpoint);
-
-            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                pendingRanges.put(range, endpoint);
-
-            allLeftMetadata.removeEndpoint(endpoint);
-        }
-
         tm.setPendingRanges(table, pendingRanges);
 
         if (logger.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee2ed3c8/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
deleted file mode 100644
index ca8ea02..0000000
--- a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Date;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.SystemTable;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ScheduledRangeTransferExecutorService
-{
-    private static final Logger LOG = LoggerFactory.getLogger(ScheduledRangeTransferExecutorService.class);
-    private static final int INTERVAL = 10;
-    private ScheduledExecutorService scheduler;
-
-    public void setup()
-    {
-        if (DatabaseDescriptor.getNumTokens() == 1)
-        {
-            LOG.warn("Cannot start range transfer scheduler: endpoint is not virtual nodes-enabled");
-            return;
-        }
-
-        scheduler = Executors.newSingleThreadScheduledExecutor(new RangeTransferThreadFactory());
-        scheduler.scheduleWithFixedDelay(new RangeTransfer(), 0, INTERVAL, TimeUnit.SECONDS);
-        LOG.info("Enabling scheduled transfers of token ranges");
-    }
-
-    public void tearDown()
-    {
-        if (scheduler == null)
-        {
-            LOG.warn("Unabled to shutdown; Scheduler never enabled");
-            return;
-        }
-
-        LOG.info("Shutting down range transfer scheduler");
-        scheduler.shutdownNow();
-    }
-}
-
-class RangeTransfer implements Runnable
-{
-    private static final Logger LOG = LoggerFactory.getLogger(RangeTransfer.class);
-
-    public void run()
-    {
-        UntypedResultSet res = processInternal("SELECT * FROM system." + SystemTable.RANGE_XFERS_CF);
-
-        if (res.size() < 1)
-        {
-            LOG.debug("No queued ranges to transfer");
-            return;
-        }
-
-        if (!isReady())
-            return;
-
-        UntypedResultSet.Row row = res.iterator().next();
-
-        Date requestedAt = row.getTimestamp("requested_at");
-        ByteBuffer tokenBytes = row.getBytes("token_bytes");
-        Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(tokenBytes);
-
-        LOG.info("Initiating transfer of {} (scheduled at {})", token, requestedAt.toString());
-        try
-        {
-            StorageService.instance.relocateTokens(Collections.singleton(token));
-        }
-        catch (Exception e)
-        {
-            LOG.error("Error removing {}: {}", token, e);
-        }
-        finally
-        {
-            LOG.debug("Removing queued entry for transfer of {}", token);
-            processInternal(String.format("DELETE FROM system.%s WHERE token_bytes = '%s'",
-                                          SystemTable.RANGE_XFERS_CF,
-                                          ByteBufferUtil.bytesToHex(tokenBytes)));
-        }
-    }
-
-    private boolean isReady()
-    {
-        int targetTokens = DatabaseDescriptor.getNumTokens();
-        int highMark = (int)Math.ceil(targetTokens + (targetTokens * .10));
-        int actualTokens = StorageService.instance.getTokens().size();
-
-        if (actualTokens >= highMark)
-        {
-            LOG.warn("Pausing until token count stabilizes (target={}, actual={})", targetTokens, actualTokens);
-            return false;
-        }
-
-        return true;
-    }
-}
-
-class RangeTransferThreadFactory implements ThreadFactory
-{
-    private AtomicInteger count = new AtomicInteger(0);
-
-    public Thread newThread(Runnable r)
-    {
-        Thread rangeXferThread = new Thread(r);
-        rangeXferThread.setName(String.format("ScheduledRangeXfers:%d", count.getAndIncrement()));
-        return rangeXferThread;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee2ed3c8/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 1f3d1e1..047e8d6 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -190,8 +190,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     private static final AtomicInteger nextRepairCommand = new AtomicInteger();
 
-    private static ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService();
-
     private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<IEndpointLifecycleSubscriber>();
 
     private final ObjectName jmxObjectName;
@@ -1318,8 +1316,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * Other STATUS values that may be seen (possibly anywhere in the normal progression):
      * STATUS_MOVING,newtoken
      *   set if node is currently moving to a new token in the ring
-     * STATUS_RELOCATING,srcToken,srcToken,srcToken,...
-     *   set if the endpoint is in the process of relocating a token to itself
      * REMOVING_TOKEN,deadtoken
      *   set if the node is dead and is being removed by its REMOVAL_COORDINATOR
      * REMOVED_TOKEN,deadtoken
@@ -1350,8 +1346,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 handleStateLeft(endpoint, pieces);
             else if (moveName.equals(VersionedValue.STATUS_MOVING))
                 handleStateMoving(endpoint, pieces);
-            else if (moveName.equals(VersionedValue.STATUS_RELOCATING))
-                handleStateRelocating(endpoint, pieces);
         }
         else
         {
@@ -1573,33 +1567,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 if (!isClientMode)
                     tokensToUpdateInSystemTable.add(token);
             }
-            else if (tokenMetadata.isRelocating(token) && tokenMetadata.getRelocatingRanges().get(token).equals(endpoint))
-            {
-                // Token was relocating, this is the bookkeeping that makes it official.
-                tokensToUpdateInMetadata.add(token);
-                if (!isClientMode)
-                    tokensToUpdateInSystemTable.add(token);
-
-                optionalTasks.schedule(new Runnable()
-                {
-                    public void run()
-                    {
-                        logger.info("Removing RELOCATION state for {} {}", endpoint, token);
-                        getTokenMetadata().removeFromRelocating(token, endpoint);
-                    }
-                }, RING_DELAY, TimeUnit.MILLISECONDS);
-
-                // We used to own this token; This token will need to be removed from system.local
-                if (currentOwner.equals(FBUtilities.getBroadcastAddress()))
-                    localTokensToRemove.add(token);
-
-                logger.info("Token {} relocated to {}", token, endpoint);
-            }
-            else if (tokenMetadata.isRelocating(token))
-            {
-                logger.info("Token {} is relocating to {}, ignoring update from {}",
-                        new Object[]{token, tokenMetadata.getRelocatingRanges().get(token), endpoint});
-            }
             else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
             {
                 tokensToUpdateInMetadata.add(token);
@@ -1618,8 +1585,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                                           currentOwner,
                                           token,
                                           endpoint));
-                if (logger.isDebugEnabled())
-                    logger.debug("Relocating ranges: {}", tokenMetadata.printRelocatingRanges());
             }
             else
             {
@@ -1628,8 +1593,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                                            currentOwner,
                                            token,
                                            endpoint));
-                if (logger.isDebugEnabled())
-                    logger.debug("Relocating ranges: {}", tokenMetadata.printRelocatingRanges());
             }
         }
 
@@ -1729,26 +1692,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     }
 
     /**
-     * Handle one or more ranges (tokens) moving from their respective endpoints, to another.
-     *
-     * @param endpoint the destination of the move
-     * @param pieces STATE_RELOCATING,token,token,...
-     */
-    private void handleStateRelocating(InetAddress endpoint, String[] pieces)
-    {
-        assert pieces.length >= 2;
-
-        List<Token> tokens = new ArrayList<Token>(pieces.length - 1);
-        for (String tStr : Arrays.copyOfRange(pieces, 1, pieces.length))
-            tokens.add(getPartitioner().getTokenFactory().fromString(tStr));
-
-        logger.debug("Tokens {} are relocating to {}", tokens, endpoint);
-        tokenMetadata.addRelocatingTokens(tokens, endpoint);
-
-        PendingRangeCalculatorService.instance.update();
-    }
-
-    /**
      * Handle notification that a node being actively removed from the ring via 'removenode'
      *
      * @param endpoint node
@@ -3205,95 +3148,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
-    public void relocate(Collection<String> srcTokens) throws IOException
-    {
-        List<Token> tokens = new ArrayList<Token>(srcTokens.size());
-        try
-        {
-            for (String srcT : srcTokens)
-            {
-                getPartitioner().getTokenFactory().validate(srcT);
-                tokens.add(getPartitioner().getTokenFactory().fromString(srcT));
-            }
-        }
-        catch (ConfigurationException e)
-        {
-            throw new IOException(e.getMessage());
-        }
-        relocateTokens(tokens);
-    }
-
-    void relocateTokens(Collection<Token> srcTokens)
-    {
-        assert srcTokens != null;
-        InetAddress localAddress = FBUtilities.getBroadcastAddress();
-        Collection<Token> localTokens = getTokenMetadata().getTokens(localAddress);
-        Set<Token> tokens = new HashSet<Token>(srcTokens);
-
-        Iterator<Token> it = tokens.iterator();
-        while (it.hasNext())
-        {
-            Token srcT = it.next();
-            if (localTokens.contains(srcT))
-            {
-                it.remove();
-                logger.warn("cannot move {}; source and destination match", srcT);
-            }
-        }
-
-        if (tokens.size() < 1)
-        {
-            logger.warn("no valid token arguments specified; nothing to relocate");
-            return;
-        }
-
-        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.relocating(tokens));
-        setMode(Mode.RELOCATING, String.format("relocating %s to %s", tokens, localAddress.getHostAddress()), true);
-
-        List<String> tables = Schema.instance.getNonSystemTables();
-
-        setMode(Mode.RELOCATING, String.format("Sleeping %s ms before start streaming/fetching ranges", RING_DELAY), true);
-        try
-        {
-            Thread.sleep(RING_DELAY);
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException("Sleep interrupted " + e.getMessage());
-        }
-
-        RangeRelocator relocator = new RangeRelocator(tokens, tables);
-
-        if (relocator.streamsNeeded())
-        {
-            setMode(Mode.RELOCATING, "fetching new ranges and streaming old ranges", true);
-
-            relocator.logStreamsMap("[Relocate->STREAMING]");
-            CountDownLatch streamLatch = relocator.streams();
-
-            relocator.logRequestsMap("[Relocate->FETCHING]");
-            CountDownLatch fetchLatch = relocator.requests();
-
-            try
-            {
-                streamLatch.await();
-                fetchLatch.await();
-            }
-            catch (InterruptedException e)
-            {
-                throw new RuntimeException("Interrupted latch while waiting for stream/fetch ranges to finish: " + e.getMessage());
-            }
-        }
-        else
-            setMode(Mode.RELOCATING, "no new ranges to stream/fetch", true);
-
-        Collection<Token> currentTokens = SystemTable.updateLocalTokens(tokens, Collections.<Token>emptyList());
-        tokenMetadata.updateNormalTokens(currentTokens, FBUtilities.getBroadcastAddress());
-        Gossiper.instance.addLocalApplicationState(ApplicationState.TOKENS, valueFactory.tokens(currentTokens));
-        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.normal(currentTokens));
-        setMode(Mode.NORMAL, false);
-    }
-
     /**
      * Get the status of a token removal.
      */
@@ -4026,16 +3880,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return tracingProbability;
     }
 
-    public void enableScheduledRangeXfers()
-    {
-        rangeXferExecutor.setup();
-    }
-
-    public void disableScheduledRangeXfers()
-    {
-        rangeXferExecutor.tearDown();
-    }
-
     /** Returns the name of the cluster */
     public String getClusterName()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee2ed3c8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 63add12..63b3a13 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -306,11 +306,6 @@ public interface StorageServiceMBean extends NotificationEmitter
     public void move(String newToken) throws IOException;
 
     /**
-     * @param srcTokens tokens to move to this node
-     */
-    public void relocate(Collection<String> srcTokens) throws IOException;
-
-    /**
      * removeToken removes token (and all data associated with
      * enpoint that had it) from the ring
      */
@@ -467,11 +462,6 @@ public interface StorageServiceMBean extends NotificationEmitter
      */
     public double getTracingProbability();
 
-    /** Begin processing of queued range transfers. */
-    public void enableScheduledRangeXfers();
-    /** Disable processing of queued range transfers. */
-    public void disableScheduledRangeXfers();
-
     /** Returns the name of the cluster */
     public String getClusterName();
     /** Returns the cluster partitioner */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee2ed3c8/src/java/org/apache/cassandra/tools/Shuffle.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/Shuffle.java b/src/java/org/apache/cassandra/tools/Shuffle.java
deleted file mode 100644
index 3c8dea0..0000000
--- a/src/java/org/apache/cassandra/tools/Shuffle.java
+++ /dev/null
@@ -1,767 +0,0 @@
-package org.apache.cassandra.tools;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-import javax.management.JMX;
-import javax.management.MBeanServerConnection;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.cassandra.cql.jdbc.JdbcDate;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
-import org.apache.cassandra.service.StorageServiceMBean;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.CqlRow;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.TimedOutException;
-import org.apache.cassandra.thrift.TokenRange;
-import org.apache.cassandra.thrift.UnavailableException;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.MissingArgumentException;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TFastFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-public class Shuffle extends AbstractJmxClient
-{
-    private static final String ssObjName = "org.apache.cassandra.db:type=StorageService";
-    private static final String epSnitchObjName = "org.apache.cassandra.db:type=EndpointSnitchInfo";
-
-    private StorageServiceMBean ssProxy = null;
-    private Random rand = new Random(System.currentTimeMillis());
-    private final String thriftHost;
-    private final int thriftPort;
-    private final boolean thriftFramed;
-
-    static
-    {
-        addCmdOption("th",  "thrift-host",   true,  "Thrift hostname or IP address (Default: JMX host)");
-        addCmdOption("tp",  "thrift-port",   true,  "Thrift port number (Default: 9160)");
-        addCmdOption("tf",  "thrift-framed", false, "Enable framed transport for Thrift (Default: false)");
-        addCmdOption("en",  "and-enable",    true,  "Immediately enable shuffling (create only)");
-        addCmdOption("dc",  "only-dc",       true,  "Apply only to named DC (create only)");
-    }
-
-    public Shuffle(String host, int port) throws IOException
-    {
-        this(host, port, host, 9160, false, null, null);
-    }
-
-    public Shuffle(String host, int port, String thriftHost, int thriftPort, boolean thriftFramed, String username, String password)
-    throws IOException
-    {
-        super(host, port, username, password);
-
-        this.thriftHost = thriftHost;
-        this.thriftPort = thriftPort;
-        this.thriftFramed = thriftFramed;
-
-        // Setup the StorageService proxy.
-        ssProxy = getSSProxy(jmxConn.getMbeanServerConn());
-    }
-
-    public StorageServiceMBean getSSProxy(MBeanServerConnection mbeanConn)
-    {
-        StorageServiceMBean proxy = null;
-        try
-        {
-            ObjectName name = new ObjectName(ssObjName);
-            proxy = JMX.newMBeanProxy(mbeanConn, name, StorageServiceMBean.class);
-        }
-        catch (MalformedObjectNameException e)
-        {
-            throw new RuntimeException(e);
-        }
-        return proxy;
-    }
-
-    public EndpointSnitchInfoMBean getEpSnitchProxy(MBeanServerConnection mbeanConn)
-    {
-        EndpointSnitchInfoMBean proxy = null;
-        try
-        {
-            ObjectName name = new ObjectName(epSnitchObjName);
-            proxy = JMX.newMBeanProxy(mbeanConn, name, EndpointSnitchInfoMBean.class);
-        }
-        catch (MalformedObjectNameException e)
-        {
-            throw new RuntimeException(e);
-        }
-        return proxy;
-    }
-
-    /**
-     * Given a Multimap of endpoint to tokens, return a new randomized mapping.
-     *
-     * @param endpointMap current mapping of endpoint to tokens
-     * @return a new mapping of endpoint to tokens
-     */
-    public Multimap<String, String> calculateRelocations(Multimap<String, String> endpointMap)
-    {
-        Multimap<String, String> relocations = HashMultimap.create();
-        Set<String> endpoints = new HashSet<String>(endpointMap.keySet());
-        Map<String, Integer> endpointToNumTokens = new HashMap<String, Integer>(endpoints.size());
-        Map<String, Iterator<String>> iterMap = new HashMap<String, Iterator<String>>(endpoints.size());
-
-        // Create maps of endpoint to token iterators, and endpoint to number of tokens.
-        for (String endpoint : endpoints)
-        {
-            try
-            {
-                endpointToNumTokens.put(endpoint, ssProxy.getTokens(endpoint).size());
-            }
-            catch (UnknownHostException e)
-            {
-                throw new RuntimeException("What that...?", e);
-            }
-
-            iterMap.put(endpoint, endpointMap.get(endpoint).iterator());
-        }
-
-        int epsToComplete = endpoints.size();
-        Set<String> endpointsCompleted = new HashSet<String>();
-
-        outer:
-        while (true)
-        {
-            endpoints.removeAll(endpointsCompleted);
-
-            for (String endpoint : endpoints)
-            {
-                boolean choiceMade = false;
-
-                if (!iterMap.get(endpoint).hasNext())
-                {
-                    endpointsCompleted.add(endpoint);
-                    continue;
-                }
-
-                String token = iterMap.get(endpoint).next();
-
-                List<String> subSet = new ArrayList<String>(endpoints);
-                subSet.remove(endpoint);
-                Collections.shuffle(subSet, rand);
-
-                for (String choice : subSet)
-                {
-                    if (relocations.get(choice).size() < endpointToNumTokens.get(choice))
-                    {
-                        relocations.put(choice, token);
-                        choiceMade = true;
-                        break;
-                    }
-                }
-
-                if (!choiceMade)
-                    relocations.put(endpoint, token);
-            }
-
-            // We're done when we've exhausted all of the token iterators
-            if (endpointsCompleted.size() == epsToComplete)
-                break outer;
-        }
-
-        return relocations;
-    }
-
-    /**
-     * Enable relocations.
-     *
-     * @param endpoints sequence of hostname or IP strings
-     */
-    public void enableRelocations(String...endpoints)
-    {
-        enableRelocations(Arrays.asList(endpoints));
-    }
-
-    /**
-     * Enable relocations.
-     *
-     * @param endpoints Collection of hostname or IP strings
-     */
-    public void enableRelocations(Collection<String> endpoints)
-    {
-        for (String endpoint : endpoints)
-        {
-            try
-            {
-                JMXConnection conn = new JMXConnection(endpoint, port, username, password);
-                getSSProxy(conn.getMbeanServerConn()).enableScheduledRangeXfers();
-                conn.close();
-            }
-            catch (IOException e)
-            {
-                writeln("Failed to enable shuffling on %s!", endpoint);
-            }
-        }
-    }
-
-    /**
-     * Disable relocations.
-     *
-     * @param endpoints sequence of hostname or IP strings
-     */
-    public void disableRelocations(String...endpoints)
-    {
-        disableRelocations(Arrays.asList(endpoints));
-    }
-
-    /**
-     * Disable relocations.
-     *
-     * @param endpoints Collection of hostname or IP strings
-     */
-    public void disableRelocations(Collection<String> endpoints)
-    {
-        for (String endpoint : endpoints)
-        {
-            try
-            {
-                JMXConnection conn = new JMXConnection(endpoint, port, username, password);
-                getSSProxy(conn.getMbeanServerConn()).disableScheduledRangeXfers();
-                conn.close();
-            }
-            catch (IOException e)
-            {
-                writeln("Failed to enable shuffling on %s!", endpoint);
-            }
-        }
-    }
-
-    /**
-     * Return a list of the live nodes (using JMX).
-     *
-     * @return String endpoint names
-     * @throws ShuffleError
-     */
-    public Collection<String> getLiveNodes() throws ShuffleError
-    {
-        try
-        {
-            JMXConnection conn = new JMXConnection(host, port, username, password);
-            return getSSProxy(conn.getMbeanServerConn()).getLiveNodes();
-        }
-        catch (IOException e)
-        {
-            throw new ShuffleError("Error retrieving list of nodes from JMX interface");
-        }
-    }
-
-    /**
-     * Create and distribute a new, randomized token to endpoint mapping.
-     *
-     * @throws ShuffleError on handled exceptions
-     */
-    public void shuffle(boolean enable, String onlyDc) throws ShuffleError
-    {
-        CassandraClient seedClient = null;
-        Map<String, String> tokenMap = null;
-        IPartitioner<?> partitioner = null;
-        Multimap<String, String> endpointMap = HashMultimap.create();
-        EndpointSnitchInfoMBean epSnitchProxy = getEpSnitchProxy(jmxConn.getMbeanServerConn());
-
-        try
-        {
-            seedClient = getThriftClient(thriftHost, thriftPort, thriftFramed);
-            tokenMap = seedClient.describe_token_map();
-
-            for (Map.Entry<String, String> entry : tokenMap.entrySet())
-            {
-                String endpoint = entry.getValue(), token = entry.getKey();
-                try
-                {
-                    if (onlyDc != null)
-                    {
-                        if (onlyDc.equals(epSnitchProxy.getDatacenter(endpoint)))
-                            endpointMap.put(endpoint, token);
-                    }
-                    else
-                        endpointMap.put(endpoint, token);
-                }
-                catch (UnknownHostException e)
-                {
-                    writeln("Warning: %s unknown to EndpointSnitch!", endpoint);
-                }
-            }
-        }
-        catch (InvalidRequestException ire)
-        {
-            throw new RuntimeException("What that...?", ire);
-        }
-        catch (TException e)
-        {
-            throw new ShuffleError(
-                    String.format("Thrift request to %s:%d failed: %s", thriftHost, thriftPort, e.getMessage()));
-        }
-
-        partitioner = getPartitioner(thriftHost, thriftPort, thriftFramed);
-
-        Multimap<String, String> relocations = calculateRelocations(endpointMap);
-
-        writeln("%-42s %-15s %-15s", "Token", "From", "To");
-        writeln("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~");
-
-        // Store relocations on remote nodes.
-        for (String endpoint : relocations.keySet())
-        {
-            for (String tok : relocations.get(endpoint))
-                writeln("%-42s %-15s %-15s", tok, tokenMap.get(tok), endpoint);
-
-            String cqlQuery = createShuffleBatchInsert(relocations.get(endpoint), partitioner);
-            executeCqlQuery(endpoint, thriftPort, thriftFramed, cqlQuery);
-        }
-
-        if (enable)
-            enableRelocations(relocations.keySet());
-    }
-
-    /**
-     * Print a list of pending token relocations for all nodes.
-     *
-     * @throws ShuffleError
-     */
-    public void ls() throws ShuffleError
-    {
-        Map<String, List<CqlRow>> queuedRelocations = listRelocations();
-        IPartitioner<?> partitioner = getPartitioner(thriftHost, thriftPort, thriftFramed);
-        boolean justOnce = false;
-
-        for (String host : queuedRelocations.keySet())
-        {
-            for (CqlRow row : queuedRelocations.get(host))
-            {
-                assert row.getColumns().size() == 2;
-
-                if (!justOnce)
-                {
-                    writeln("%-42s %-15s %s", "Token", "Endpoint", "Requested at");
-                    writeln("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
-                    justOnce = true;
-                }
-
-                ByteBuffer tokenBytes = ByteBuffer.wrap(row.getColumns().get(0).getValue());
-                ByteBuffer requestedAt = ByteBuffer.wrap(row.getColumns().get(1).getValue());
-                Date time = JdbcDate.instance.compose(requestedAt);
-                Token<?> token = partitioner.getTokenFactory().fromByteArray(tokenBytes);
-
-                writeln("%-42s %-15s %s", token.toString(), host, time.toString());
-            }
-        }
-    }
-
-    /**
-     * List pending token relocations for all nodes.
-     *
-     * @return
-     * @throws ShuffleError
-     */
-    private Map<String, List<CqlRow>> listRelocations() throws ShuffleError
-    {
-        String cqlQuery = "SELECT token_bytes,requested_at FROM system.range_xfers";
-        Map<String, List<CqlRow>> results = new HashMap<String, List<CqlRow>>();
-
-        for (String host : getLiveNodes())
-        {
-            CqlResult result = executeCqlQuery(host, thriftPort, thriftFramed, cqlQuery);
-            results.put(host, result.getRows());
-        }
-
-        return results;
-    }
-
-    /**
-     * Clear pending token relocations on all nodes.
-     *
-     * @throws ShuffleError
-     */
-    public void clear() throws ShuffleError
-    {
-        Map<String, List<CqlRow>> queuedRelocations = listRelocations();
-
-        for (String host : queuedRelocations.keySet())
-        {
-
-            for (CqlRow row : queuedRelocations.get(host))
-            {
-                assert row.getColumns().size() == 2;
-
-                ByteBuffer tokenBytes = ByteBuffer.wrap(row.getColumns().get(0).getValue());
-                String query = String.format("DELETE FROM system.range_xfers WHERE token_bytes = '%s'",
-                        ByteBufferUtil.bytesToHex(tokenBytes));
-                executeCqlQuery(host, thriftPort, thriftFramed, query);
-            }
-        }
-    }
-
-    /**
-     * Enable shuffling on all nodes in the cluster.
-     *
-     * @throws ShuffleError
-     */
-    public void enable() throws ShuffleError
-    {
-        enableRelocations(getLiveNodes());
-    }
-
-    /**
-     * Disable shuffling on all nodes in the cluster.
-     *
-     * @throws ShuffleError
-     */
-    public void disable() throws ShuffleError
-    {
-        disableRelocations(getLiveNodes());
-    }
-
-    /**
-     * Setup and return a new Thrift RPC connection.
-     *
-     * @param hostName hostname or address to connect to
-     * @param port port number to connect to
-     * @param framed wrap with framed transport if true
-     * @return a CassandraClient instance
-     * @throws ShuffleError
-     */
-    public static CassandraClient getThriftClient(String hostName, int port, boolean framed) throws ShuffleError
-    {
-        try
-        {
-            return new CassandraClient(hostName, port, framed);
-        }
-        catch (TTransportException e)
-        {
-            throw new ShuffleError(String.format("Unable to connect to %s/%d: %s", hostName, port, e.getMessage()));
-        }
-    }
-
-    /**
-     * Execute a CQL v3 query.
-     *
-     * @param hostName hostname or address to connect to
-     * @param port port number to connect to
-     * @param isFramed wrap with framed transport if true
-     * @param cqlQuery CQL query string
-     * @return a Thrift CqlResult instance
-     * @throws ShuffleError
-     */
-    public static CqlResult executeCqlQuery(String hostName, int port, boolean isFramed, String cqlQuery) throws ShuffleError
-    {
-        CassandraClient client = null;
-
-        try
-        {
-            client = getThriftClient(hostName, port, isFramed);
-            return client.execute_cql_query(ByteBuffer.wrap(cqlQuery.getBytes()), Compression.NONE);
-        }
-        catch (UnavailableException e)
-        {
-            throw new ShuffleError(
-                    String.format("Unable to write shuffle entries to %s. Reason: UnavailableException", hostName));
-        }
-        catch (TimedOutException e)
-        {
-            throw new ShuffleError(
-                    String.format("Unable to write shuffle entries to %s. Reason: TimedOutException", hostName));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-        finally
-        {
-            if (client != null)
-                client.close();
-        }
-    }
-
-    /**
-     * Return a partitioner instance for remote host.
-     *
-     * @param hostName hostname or address to connect to
-     * @param port port number to connect to
-     * @param framed wrap with framed transport if true
-     * @return an IPartitioner instance
-     * @throws ShuffleError
-     */
-    public static IPartitioner<?> getPartitioner(String hostName, int port, boolean framed) throws ShuffleError
-    {
-        String partitionerName = null;
-        try
-        {
-            partitionerName = getThriftClient(hostName, port, framed).describe_partitioner();
-        }
-        catch (TException e)
-        {
-            throw new ShuffleError(
-                    String.format("Thrift request to %s:%d failed: %s", hostName, port, e.getMessage()));
-        }
-        catch (InvalidRequestException e)
-        {
-            throw new RuntimeException("Error calling describe_partitioner() defies explanation", e);
-        }
-
-        try
-        {
-            Class<?> partitionerClass = Class.forName(partitionerName);
-            return (IPartitioner<?>)partitionerClass.newInstance();
-        }
-        catch (ClassNotFoundException e)
-        {
-            throw new ShuffleError("Unable to locate class for partitioner: " + partitionerName);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Create and return a CQL batch insert statement for a set of token relocations.
-     *
-     * @param tokens tokens to be relocated
-     * @param partitioner an instance of the IPartitioner in use
-     * @return a query string
-     */
-    public static String createShuffleBatchInsert(Collection<String> tokens, IPartitioner<?> partitioner)
-    {
-        StringBuilder query = new StringBuilder();
-        query.append("BEGIN BATCH").append("\n");
-
-        for (String tokenStr : tokens)
-        {
-            Token<?> token = partitioner.getTokenFactory().fromString(tokenStr);
-            String hexToken = ByteBufferUtil.bytesToHex(partitioner.getTokenFactory().toByteArray(token));
-            query.append("INSERT INTO system.range_xfers (token_bytes, requested_at) ")
-                 .append("VALUES (").append("0x").append(hexToken).append(", 'now');").append("\n");
-        }
-
-        query.append("APPLY BATCH").append("\n");
-        return query.toString();
-    }
-
-    /** Print usage information. */
-    private static void printShuffleHelp()
-    {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Sub-commands:").append(String.format("%n"));
-        sb.append(" create           Initialize a new shuffle operation").append(String.format("%n"));
-        sb.append(" ls               List pending relocations").append(String.format("%n"));
-        sb.append(" clear            Clear pending relocations").append(String.format("%n"));
-        sb.append(" en[able]         Enable shuffling").append(String.format("%n"));
-        sb.append(" dis[able]        Disable shuffling").append(String.format("%n%n"));
-
-        printHelp("shuffle [options] <sub-command>", sb.toString());
-    }
-
-    /**
-     * Execute.
-     *
-     * @param args arguments passed on the command line
-     * @throws Exception when face meets palm
-     */
-    public static void main(String[] args) throws Exception
-    {
-        CommandLine cmd = null;
-        try
-        {
-            cmd = processArguments(args);
-        }
-        catch (MissingArgumentException e)
-        {
-            System.err.println(e.getMessage());
-            System.exit(1);
-        }
-
-        // Sub command argument.
-        if (cmd.getArgList().size() < 1)
-        {
-            System.err.println("Missing sub-command argument.");
-            printShuffleHelp();
-            System.exit(1);
-        }
-        String subCommand = (String)(cmd.getArgList()).get(0);
-
-        String hostName = (cmd.getOptionValue("host") != null) ? cmd.getOptionValue("host") : DEFAULT_HOST;
-        String port = (cmd.getOptionValue("port") != null) ? cmd.getOptionValue("port") : Integer.toString(DEFAULT_JMX_PORT);
-        String username = cmd.getOptionValue("username");
-        String password = cmd.getOptionValue("password");
-        String thriftHost = (cmd.getOptionValue("thrift-host") != null) ? cmd.getOptionValue("thrift-host") : hostName;
-        String thriftPort = (cmd.getOptionValue("thrift-port") != null) ? cmd.getOptionValue("thrift-port") : "9160";
-        String onlyDc = cmd.getOptionValue("only-dc");
-        boolean thriftFramed = cmd.hasOption("thrift-framed") ? true : false;
-        boolean andEnable = cmd.hasOption("and-enable") ? true : false;
-        int portNum = -1, thriftPortNum = -1;
-
-        // Parse JMX port number
-        if (port != null)
-        {
-            try
-            {
-                portNum = Integer.parseInt(port);
-            }
-            catch (NumberFormatException ferr)
-            {
-                System.err.printf("%s is not a valid JMX port number.%n", port);
-                System.exit(1);
-            }
-        }
-        else
-            portNum = DEFAULT_JMX_PORT;
-
-        // Parse Thrift port number
-        if (thriftPort != null)
-        {
-            try
-            {
-                thriftPortNum = Integer.parseInt(thriftPort);
-            }
-            catch (NumberFormatException ferr)
-            {
-                System.err.printf("%s is not a valid port number.%n", thriftPort);
-                System.exit(1);
-            }
-        }
-        else
-            thriftPortNum = 9160;
-
-        Shuffle shuffler = new Shuffle(hostName, portNum, thriftHost, thriftPortNum, thriftFramed,
-                username, password);
-
-        try
-        {
-            if (subCommand.equals("create"))
-                shuffler.shuffle(andEnable, onlyDc);
-            else if (subCommand.equals("ls"))
-                shuffler.ls();
-            else if (subCommand.startsWith("en"))
-                shuffler.enable();
-            else if (subCommand.startsWith("dis"))
-                shuffler.disable();
-            else if (subCommand.equals("clear"))
-                shuffler.clear();
-            else
-            {
-                System.err.println("Unknown subcommand: " + subCommand);
-                printShuffleHelp();
-                System.exit(1);
-            }
-        }
-        catch (ShuffleError err)
-        {
-            shuffler.writeln(err);
-            System.exit(1);
-        }
-        finally
-        {
-            shuffler.close();
-        }
-
-        System.exit(0);
-    }
-}
-
-/** A self-contained Cassandra.Client; Closeable. */
-class CassandraClient implements Closeable
-{
-    TTransport transport;
-    Cassandra.Client client;
-
-    CassandraClient(String hostName, int port, boolean framed) throws TTransportException
-    {
-        TSocket socket = new TSocket(hostName, port);
-        transport = (framed) ? socket : new TFastFramedTransport(socket);
-        transport.open();
-        client = new Cassandra.Client(new TBinaryProtocol(transport));
-
-        try
-        {
-            client.set_cql_version("3.0.0");
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    CqlResult execute_cql_query(ByteBuffer cqlQuery, Compression compression) throws Exception
-    {
-        return client.execute_cql3_query(cqlQuery, compression, ConsistencyLevel.ONE);
-    }
-
-    String describe_partitioner() throws TException, InvalidRequestException
-    {
-        return client.describe_partitioner();
-    }
-
-    List<TokenRange> describe_ring(String keyspace) throws InvalidRequestException, TException
-    {
-        return client.describe_ring(keyspace);
-    }
-
-    Map<String, String> describe_token_map() throws InvalidRequestException, TException
-    {
-        return client.describe_token_map();
-    }
-
-    public void close()
-    {
-        transport.close();
-    }
-}
-
-@SuppressWarnings("serial")
-class ShuffleError extends Exception
-{
-    ShuffleError(String msg)
-    {
-        super(msg);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee2ed3c8/test/unit/org/apache/cassandra/service/RelocateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RelocateTest.java b/test/unit/org/apache/cassandra/service/RelocateTest.java
deleted file mode 100644
index 510d254..0000000
--- a/test/unit/org/apache/cassandra/service/RelocateTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import static org.junit.Assert.*;
-
-import java.math.BigInteger;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.db.SystemTable;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.dht.BigIntegerToken;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.RandomPartitioner;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.SimpleSnitch;
-import org.apache.cassandra.locator.TokenMetadata;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class RelocateTest
-{
-    private static final int TOKENS_PER_NODE = 256;
-    private static final int TOKEN_STEP = 10;
-    private static final IPartitioner<?> partitioner = new RandomPartitioner();
-    private static IPartitioner<?> oldPartitioner;
-    private static VersionedValue.VersionedValueFactory vvFactory;
-
-    private StorageService ss = StorageService.instance;
-    private TokenMetadata tmd = StorageService.instance.getTokenMetadata();
-
-    @Before
-    public void init()
-    {
-        tmd.clearUnsafe();
-    }
-
-    @BeforeClass
-    public static void setUp() throws Exception
-    {
-        oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
-        SchemaLoader.loadSchema();
-        vvFactory = new VersionedValue.VersionedValueFactory(partitioner);
-    }
-
-    @AfterClass
-    public static void tearDown() throws Exception
-    {
-        StorageService.instance.setPartitionerUnsafe(oldPartitioner);
-        SchemaLoader.stopGossiper();
-    }
-
-    /** Setup a virtual node ring */
-    private static Map<Token<?>, InetAddress> createInitialRing(int size) throws UnknownHostException
-    {
-        Map<Token<?>, InetAddress> tokenMap = new HashMap<Token<?>, InetAddress>();
-        int currentToken = TOKEN_STEP;
-
-        for(int i = 0; i < size; i++)
-        {
-            InetAddress endpoint = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
-            Gossiper.instance.initializeNodeUnsafe(endpoint, UUID.randomUUID(), 1);
-            List<Token> tokens = new ArrayList<Token>();
-
-            for (int j = 0; j < TOKENS_PER_NODE; j++)
-            {
-                Token token = new BigIntegerToken(String.valueOf(currentToken));
-                tokenMap.put(token, endpoint);
-                tokens.add(token);
-                currentToken += TOKEN_STEP;
-            }
-
-            Gossiper.instance.injectApplicationState(endpoint, ApplicationState.TOKENS, vvFactory.tokens(tokens));
-            StorageService.instance.onChange(endpoint, ApplicationState.STATUS, vvFactory.normal(tokens));
-        }
-
-        return tokenMap;
-    }
-
-    // Copy-pasta from MoveTest.java
-    private AbstractReplicationStrategy getStrategy(String table, TokenMetadata tmd) throws ConfigurationException
-    {
-        KSMetaData ksmd = Schema.instance.getKSMetaData(table);
-        return AbstractReplicationStrategy.createReplicationStrategy(
-                table,
-                ksmd.strategyClass,
-                tmd,
-                new SimpleSnitch(),
-                ksmd.strategyOptions);
-    }
-
-    /** Ensure proper write endpoints during relocation */
-    @Test
-    public void testWriteEndpointsDuringRelocate() throws Exception
-    {
-        Map<Token<?>, InetAddress> tokenMap = createInitialRing(5);
-        Map<Token, List<InetAddress>> expectedEndpoints = new HashMap<Token, List<InetAddress>>();
-
-
-        for (Token<?> token : tokenMap.keySet())
-        {
-            BigIntegerToken keyToken = new BigIntegerToken(((BigInteger)token.token).add(new BigInteger("5")));
-            List<InetAddress> endpoints = new ArrayList<InetAddress>();
-            Iterator<Token> tokenIter = TokenMetadata.ringIterator(tmd.sortedTokens(), keyToken, false);
-            while (tokenIter.hasNext())
-            {
-                InetAddress ep = tmd.getEndpoint(tokenIter.next());
-                if (!endpoints.contains(ep))
-                    endpoints.add(ep);
-            }
-            expectedEndpoints.put(keyToken, endpoints);
-        }
-
-        // Relocate the first token from the first endpoint, to the second endpoint.
-        Token relocateToken = new BigIntegerToken(String.valueOf(TOKEN_STEP));
-        ss.onChange(
-                InetAddress.getByName("127.0.0.2"),
-                ApplicationState.STATUS,
-                vvFactory.relocating(Collections.singleton(relocateToken)));
-        assertTrue(tmd.isRelocating(relocateToken));
-
-        AbstractReplicationStrategy strategy;
-        for (String table : Schema.instance.getNonSystemTables())
-        {
-            strategy = getStrategy(table, tmd);
-            for (Token token : tokenMap.keySet())
-            {
-                BigIntegerToken keyToken = new BigIntegerToken(((BigInteger)token.token).add(new BigInteger("5")));
-
-                HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(keyToken, table, strategy.calculateNaturalEndpoints(keyToken, tmd.cloneOnlyTokenMap())));
-                HashSet<InetAddress> expected = new HashSet<InetAddress>();
-
-                for (int i = 0; i < actual.size(); i++)
-                    expected.add(expectedEndpoints.get(keyToken).get(i));
-
-                assertEquals("mismatched endpoint sets", expected, actual);
-            }
-        }
-    }
-
-    /** Use STATUS changes to trigger membership update and validate results. */
-    @Test
-    public void testRelocationSuccess() throws UnknownHostException
-    {
-        createInitialRing(5);
-
-        // Node handling the relocation (dst), and the token being relocated (src).
-        InetAddress relocator = InetAddress.getByName("127.0.0.3");
-        Token relocatee = new BigIntegerToken(String.valueOf(TOKEN_STEP));
-
-        // Send RELOCATING and ensure token status
-        ss.onChange(relocator, ApplicationState.STATUS, vvFactory.relocating(Collections.singleton(relocatee)));
-        assertTrue(tmd.isRelocating(relocatee));
-
-        // Create a list of the endpoint's existing tokens, and add the relocatee to it.
-        List<Token> tokens = new ArrayList<Token>(tmd.getTokens(relocator));
-        SystemTable.updateTokens(tokens);
-        tokens.add(relocatee);
-
-        // Send a normal status, then ensure all is copesetic.
-        Gossiper.instance.injectApplicationState(relocator, ApplicationState.TOKENS, vvFactory.tokens(tokens));
-        ss.onChange(relocator, ApplicationState.STATUS, vvFactory.normal(tokens));
-
-        // Relocating entries are removed after RING_DELAY
-        try
-        {
-            Thread.sleep(StorageService.RING_DELAY + 10);
-        }
-        catch (InterruptedException e)
-        {
-            System.err.println("ACHTUNG! Interrupted; testRelocationSuccess() will almost certainly fail!");
-        }
-
-        assertTrue(!tmd.isRelocating(relocatee));
-        assertEquals(tmd.getEndpoint(relocatee), relocator);
-    }
-}


Mime
View raw message