cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdusba...@apache.org
Subject svn commit: r944343 - in /cassandra/trunk/src/java/org/apache/cassandra/streaming: StreamOut.java StreamOutManager.java
Date Fri, 14 May 2010 17:07:02 GMT
Author: gdusbabek
Date: Fri May 14 17:07:02 2010
New Revision: 944343

URL: http://svn.apache.org/viewvc?rev=944343&view=rev
Log:
remove from streamManagers when finished. Patch by gdusbabek, reviewed by stuhood. CASSANDRA-1076

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=944343&r1=944342&r2=944343&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Fri May 14 17:07:02
2010
@@ -72,9 +72,8 @@ public class StreamOut
     {
         assert ranges.size() > 0;
         
-        // this is a sneaking way of indicating target as a destination node. it is a lame
way of doing it and will 
-        // change as part of fixing CASSANDRA-1076.
-        StreamOutManager.get(target);
+        // this is so that this target shows up as a destination while anticompaction is
happening.
+        StreamOutManager.pendingDestinations.add(target);
 
         logger.debug("Beginning transfer process to " + target + " for ranges " + StringUtils.join(ranges,
", "));
 
@@ -113,6 +112,7 @@ public class StreamOut
         finally
         {
             StreamingService.instance.setStatus(StreamingService.NOTHING);
+            StreamOutManager.remove(target);
         }
         if (callback != null)
             callback.run();

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=944343&r1=944342&r2=944343&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java Fri May
14 17:07:02 2010
@@ -48,6 +48,7 @@ public class StreamOutManager
     private static Logger logger = LoggerFactory.getLogger( StreamOutManager.class );
         
     private static ConcurrentMap<InetAddress, StreamOutManager> streamManagers = new
ConcurrentHashMap<InetAddress, StreamOutManager>();
+    public static final Set<InetAddress> pendingDestinations = Collections.synchronizedSet(new
HashSet<InetAddress>());
 
     public static StreamOutManager get(InetAddress to)
     {
@@ -60,11 +61,21 @@ public class StreamOutManager
         }
         return manager;
     }
+    
+    public static void remove(InetAddress to)
+    {
+        if (streamManagers.containsKey(to) && streamManagers.get(to).files.size()
== 0)
+            streamManagers.remove(to);
+        pendingDestinations.remove(to);
+    }
 
     public static Set<InetAddress> getDestinations()
     {
         // the results of streamManagers.keySet() isn't serializable, so create a new set.
-        return new HashSet(streamManagers.keySet());
+        Set<InetAddress> hosts = new HashSet<InetAddress>();
+        hosts.addAll(streamManagers.keySet());
+        hosts.addAll(pendingDestinations);
+        return hosts;
     }
 
     // we need sequential and random access to the files. hence, the map and the list.



Mime
View raw message