cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eev...@apache.org
Subject [5/5] git commit: p/4443/010_refactor_range_move
Date Fri, 14 Sep 2012 15:16:18 GMT
p/4443/010_refactor_range_move

Break out common code from SS.move(Token) for later use in range
relocation.

Patch by eevans; reviewed by Brandon Williams for CASSANDRA-4559


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f365196
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f365196
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f365196

Branch: refs/heads/trunk
Commit: 1f365196f3e08c201c7e985ed0a7c0abb9a8c84e
Parents: a26eb3e
Author: Eric Evans <eevans@apache.org>
Authored: Fri Sep 14 10:08:24 2012 -0500
Committer: Eric Evans <eevans@apache.org>
Committed: Fri Sep 14 10:09:55 2012 -0500

----------------------------------------------------------------------
 .../apache/cassandra/service/StorageService.java   |  196 +++++++++------
 1 files changed, 117 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f365196/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index b8fbd0b..88780dd 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2524,111 +2524,149 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
         Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.moving(newToken));
         setMode(Mode.MOVING, String.format("Moving %s from %s to %s.", localAddress, getLocalTokens().iterator().next(),
newToken), true);
 
-        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+        RangeRelocator relocator = new RangeRelocator(Collections.singleton(newToken), tablesToProcess);
 
-        Map<String, Multimap<InetAddress, Range<Token>>> rangesToFetch
= new HashMap<String, Multimap<InetAddress, Range<Token>>>();
-        Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByTable
= new HashMap<String, Multimap<Range<Token>, InetAddress>>();
-
-        TokenMetadata tokenMetaCloneAllSettled = tokenMetadata.cloneAfterAllSettled();
-        // clone to avoid concurrent modification in calculateNaturalEndpoints
-        TokenMetadata tokenMetaClone = tokenMetadata.cloneOnlyTokenMap();
-
-        // for each of the non system tables calculating new ranges
-        // which current node will handle after move to the new token
-        for (String table : tablesToProcess)
+        setMode(Mode.MOVING, String.format("Sleeping %s ms before start streaming/fetching
ranges", RING_DELAY), true);
+        try
+        {
+            Thread.sleep(RING_DELAY);
+        }
+        catch (InterruptedException e)
         {
-            // replication strategy of the current keyspace (aka table)
-            AbstractReplicationStrategy strategy = Table.open(table).getReplicationStrategy();
+            throw new RuntimeException("Sleep interrupted " + e.getMessage());
+        }
 
-            // getting collection of the currently used ranges by this keyspace
-            Collection<Range<Token>> currentRanges = getRangesForEndpoint(table,
localAddress);
-            // collection of ranges which this node will serve after move to the new token
-            Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetadata,
newToken, localAddress);
+        if (relocator.streamsNeeded())
+        {
+            setMode(Mode.MOVING, "fetching new ranges and streaming old ranges", true);
 
-            // ring ranges and endpoints associated with them
-            // this used to determine what nodes should we ping about range data
-            Multimap<Range<Token>, InetAddress> rangeAddresses = strategy.getRangeAddresses(tokenMetaClone);
+            relocator.logStreamsMap("[Move->STREAMING]");
+            CountDownLatch streamLatch = relocator.streams();
 
-            // calculated parts of the ranges to request/stream from/to nodes in the ring
-            Pair<Set<Range<Token>>, Set<Range<Token>>> rangesPerTable
= calculateStreamAndFetchRanges(currentRanges, updatedRanges);
+            relocator.logRequestsMap("[Move->FETCHING]");
+            CountDownLatch fetchLatch = relocator.requests();
 
-            /**
-             * In this loop we are going through all ranges "to fetch" and determining
-             * nodes in the ring responsible for data we are interested in
-             */
-            Multimap<Range<Token>, InetAddress> rangesToFetchWithPreferredEndpoints
= ArrayListMultimap.create();
-            for (Range<Token> toFetch : rangesPerTable.right)
+            try
             {
-                for (Range<Token> range : rangeAddresses.keySet())
-                {
-                    if (range.contains(toFetch))
-                    {
-                        List<InetAddress> endpoints = snitch.getSortedListByProximity(localAddress,
rangeAddresses.get(range));
-                        // storing range and preferred endpoint set
-                        rangesToFetchWithPreferredEndpoints.putAll(toFetch, endpoints);
-                    }
-                }
+                streamLatch.await();
+                fetchLatch.await();
             }
-
-            // calculating endpoints to stream current ranges to if needed
-            // in some situations node will handle current ranges as part of the new ranges
-            Multimap<Range<Token>, InetAddress> rangeWithEndpoints = HashMultimap.create();
-
-            for (Range<Token> toStream : rangesPerTable.left)
+            catch (InterruptedException e)
             {
-                Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right,
tokenMetaClone));
-                Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right,
tokenMetaCloneAllSettled));
-                rangeWithEndpoints.putAll(toStream, Sets.difference(newEndpoints, currentEndpoints));
+                throw new RuntimeException("Interrupted latch while waiting for stream/fetch
ranges to finish: " + e.getMessage());
             }
+        }
+        else
+            setMode(Mode.MOVING, "No ranges to fetch/stream", true);
 
-            // associating table with range-to-endpoints map
-            rangesToStreamByTable.put(table, rangeWithEndpoints);
+        setTokens(Collections.singleton(newToken)); // setting new token as we have everything
settled
 
-            Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints);
-            rangesToFetch.put(table, workMap);
+        if (logger.isDebugEnabled())
+            logger.debug("Successfully moved to new token {}", getLocalTokens().iterator().next());
+    }
 
-            if (logger.isDebugEnabled())
-                logger.debug("Table {}: work map {}.", table, workMap);
+    private class RangeRelocator
+    {
+        private Map<String, Multimap<InetAddress, Range<Token>>> rangesToFetch
= new HashMap<String, Multimap<InetAddress, Range<Token>>>();
+        private Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByTable
= new HashMap<String, Multimap<Range<Token>, InetAddress>>();
+
+        private RangeRelocator(Collection<Token> tokens, List<String> tables)
+        {
+            calculateToFromStreams(tokens, tables);
         }
 
-        if (!rangesToStreamByTable.isEmpty() || !rangesToFetch.isEmpty())
+        private void calculateToFromStreams(Collection<Token> newTokens, List<String>
tables)
         {
-            setMode(Mode.MOVING, String.format("Sleeping %s ms before start streaming/fetching
ranges", RING_DELAY), true);
-            try
-            {
-                Thread.sleep(RING_DELAY);
-            }
-            catch (InterruptedException e)
+            InetAddress localAddress = FBUtilities.getBroadcastAddress();
+            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+            TokenMetadata tokenMetaCloneAllSettled = tokenMetadata.cloneAfterAllSettled();
+            // clone to avoid concurrent modification in calculateNaturalEndpoints
+            TokenMetadata tokenMetaClone = tokenMetadata.cloneOnlyTokenMap();
+
+            for (String table : tables)
             {
-                throw new RuntimeException("Sleep interrupted " + e.getMessage());
-            }
+                for (Token newToken : newTokens)
+                {
+                    // replication strategy of the current keyspace (aka table)
+                    AbstractReplicationStrategy strategy = Table.open(table).getReplicationStrategy();
+
+                    // getting collection of the currently used ranges by this keyspace
+                    Collection<Range<Token>> currentRanges = getRangesForEndpoint(table,
localAddress);
+                    // collection of ranges which this node will serve after move to the
new token
+                    Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetadata,
newToken, localAddress);
+
+                    // ring ranges and endpoints associated with them
+                    // this used to determine what nodes should we ping about range data
+                    Multimap<Range<Token>, InetAddress> rangeAddresses = strategy.getRangeAddresses(tokenMetaClone);
+
+                    // calculated parts of the ranges to request/stream from/to nodes in
the ring
+                    Pair<Set<Range<Token>>, Set<Range<Token>>>
rangesPerTable = calculateStreamAndFetchRanges(currentRanges, updatedRanges);
+
+                    /**
+                     * In this loop we are going through all ranges "to fetch" and determining
+                     * nodes in the ring responsible for data we are interested in
+                     */
+                    Multimap<Range<Token>, InetAddress> rangesToFetchWithPreferredEndpoints
= ArrayListMultimap.create();
+                    for (Range<Token> toFetch : rangesPerTable.right)
+                    {
+                        for (Range<Token> range : rangeAddresses.keySet())
+                        {
+                            if (range.contains(toFetch))
+                            {
+                                List<InetAddress> endpoints = snitch.getSortedListByProximity(localAddress,
rangeAddresses.get(range));
+                                // storing range and preferred endpoint set
+                                rangesToFetchWithPreferredEndpoints.putAll(toFetch, endpoints);
+                            }
+                        }
+                    }
 
-            setMode(Mode.MOVING, "fetching new ranges and streaming old ranges", true);
-            if (logger.isDebugEnabled())
-                logger.debug("[Move->STREAMING] Work Map: " + rangesToStreamByTable);
+                    // calculating endpoints to stream current ranges to if needed
+                    // in some situations node will handle current ranges as part of the
new ranges
+                    Multimap<Range<Token>, InetAddress> rangeWithEndpoints =
HashMultimap.create();
 
-            CountDownLatch streamLatch = streamRanges(rangesToStreamByTable);
+                    for (Range<Token> toStream : rangesPerTable.left)
+                    {
+                        Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right,
tokenMetaClone));
+                        Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right,
tokenMetaCloneAllSettled));
+                        rangeWithEndpoints.putAll(toStream, Sets.difference(newEndpoints,
currentEndpoints));
+                    }
 
-            if (logger.isDebugEnabled())
-                logger.debug("[Move->FETCHING] Work Map: " + rangesToFetch);
+                    // associating table with range-to-endpoints map
+                    rangesToStreamByTable.put(table, rangeWithEndpoints);
 
-            CountDownLatch fetchLatch = requestRanges(rangesToFetch);
+                    Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints);
+                    rangesToFetch.put(table, workMap);
 
-            try
-            {
-                streamLatch.await();
-                fetchLatch.await();
-            }
-            catch (InterruptedException e)
-            {
-                throw new RuntimeException("Interrupted latch while waiting for stream/fetch
ranges to finish: " + e.getMessage());
+                    if (logger.isDebugEnabled())
+                        logger.debug("Table {}: work map {}.", table, workMap);
+                }
             }
         }
 
-        setTokens(Collections.singleton(newToken)); // setting new token as we have everything
settled
+        private void logStreamsMap(String prefix)
+        {
+            logger.debug("{} Work map: {}", prefix, rangesToStreamByTable);
+        }
 
-        if (logger.isDebugEnabled())
-            logger.debug("Successfully moved to new token {}", getLocalTokens().iterator().next());
+        private void logRequestsMap(String prefix)
+        {
+            logger.debug("{} Work map: {}", prefix, rangesToFetch);
+        }
+
+        private boolean streamsNeeded()
+        {
+            return !rangesToStreamByTable.isEmpty() || !rangesToFetch.isEmpty();
+        }
+
+        private CountDownLatch streams()
+        {
+            return streamRanges(rangesToStreamByTable);
+        }
+
+        private CountDownLatch requests()
+        {
+            return requestRanges(rangesToFetch);
+        }
     }
 
     /**


Mime
View raw message