cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [36/50] [abbrv] git commit: group stream out ranges
Date Fri, 27 Jul 2012 15:19:39 GMT
group stream out ranges

Patch by Sam Overton; reviewed by Brandon Williams for CASSANDRA-4122


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8c09e87e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8c09e87e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8c09e87e

Branch: refs/heads/cassandra-1.1
Commit: 8c09e87e40020139611088cf836f8f079bffb0ce
Parents: 1a3661f
Author: Eric Evans <eevans@apache.org>
Authored: Wed Jul 18 13:35:53 2012 -0500
Committer: Eric Evans <eevans@apache.org>
Committed: Wed Jul 18 13:35:53 2012 -0500

----------------------------------------------------------------------
 .../apache/cassandra/service/StorageService.java   |   51 ++++++++++----
 1 files changed, 36 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c09e87e/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 207bf69..3875054 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2984,42 +2984,62 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
      */
     private CountDownLatch streamRanges(final Map<String, Multimap<Range<Token>,
InetAddress>> rangesToStreamByTable)
     {
-        final CountDownLatch latch = new CountDownLatch(rangesToStreamByTable.keySet().size());
+        // First, we build a list of ranges to stream to each host, per table
+        final Map<String, Map<InetAddress, List<Range<Token>>>> sessionsToStreamByTable
= new HashMap<String, Map<InetAddress, List<Range<Token>>>>();
+        // The number of stream out sessions we need to start, to be built up as we build
sessionsToStreamByTable
+        int sessionCount = 0;
+
         for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry
: rangesToStreamByTable.entrySet())
         {
             Multimap<Range<Token>, InetAddress> rangesWithEndpoints = entry.getValue();
 
             if (rangesWithEndpoints.isEmpty())
-            {
-                latch.countDown();
                 continue;
-            }
 
             final String table = entry.getKey();
 
-            final Set<Map.Entry<Range<Token>, InetAddress>> pending = new
HashSet<Map.Entry<Range<Token>, InetAddress>>(rangesWithEndpoints.entries());
+            Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new
HashMap<InetAddress, List<Range<Token>>>();
 
             for (final Map.Entry<Range<Token>, InetAddress> endPointEntry : rangesWithEndpoints.entries())
             {
                 final Range<Token> range = endPointEntry.getKey();
-                final InetAddress newEndpoint = endPointEntry.getValue();
+                final InetAddress endpoint = endPointEntry.getValue();
+
+                List<Range<Token>> curRanges = rangesPerEndpoint.get(endpoint);
+                if (curRanges == null)
+                {
+                    curRanges = new LinkedList<Range<Token>>();
+                    rangesPerEndpoint.put(endpoint, curRanges);
+                }
+                curRanges.add(range);
+            }
+
+            sessionCount += rangesPerEndpoint.size();
+            sessionsToStreamByTable.put(table, rangesPerEndpoint);
+        }
+
+        final CountDownLatch latch = new CountDownLatch(sessionCount);
+
+        for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>>
entry : sessionsToStreamByTable.entrySet())
+        {
+            final String table = entry.getKey();
+            final Map<InetAddress, List<Range<Token>>> rangesPerEndpoint
= entry.getValue();
+
+            for (final Map.Entry<InetAddress, List<Range<Token>>> rangesEntry
: rangesPerEndpoint.entrySet())
+            {
+                final List<Range<Token>> ranges = rangesEntry.getValue();
+                final InetAddress newEndpoint = rangesEntry.getKey();
 
                 final IStreamCallback callback = new IStreamCallback()
                 {
                     public void onSuccess()
                     {
-                        synchronized (pending)
-                        {
-                            pending.remove(endPointEntry);
-
-                            if (pending.isEmpty())
-                                latch.countDown();
-                        }
+                        latch.countDown();
                     }
 
                     public void onFailure()
                     {
-                        logger.warn("Streaming to " + endPointEntry + " failed");
+                        logger.warn("Streaming to " + newEndpoint + " failed");
                         onSuccess(); // calling onSuccess for latch countdown
                     }
                 };
@@ -3029,7 +3049,8 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
                     public void run()
                     {
                         // TODO each call to transferRanges re-flushes, this is potentially
a lot of waste
-                        StreamOut.transferRanges(newEndpoint, Table.open(table), Arrays.asList(range),
callback, OperationType.UNBOOTSTRAP);
+                        StreamOut.transferRanges(newEndpoint, Table.open(table), ranges,
callback,
+                                OperationType.UNBOOTSTRAP);
                     }
                 });
             }


Mime
View raw message