cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r904925 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/io/
Date Sun, 31 Jan 2010 00:22:30 GMT
Author: jbellis
Date: Sun Jan 31 00:22:29 2010
New Revision: 904925

URL: http://svn.apache.org/viewvc?rev=904925&view=rev
Log:
split Streaming into StreamOut and StreamIn; clean up StreamManager
patch by jbellis; reviewed by stuhood for CASSANDRA-751

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java   (with
props)
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java   (contents,
props changed)
      - copied, changed from r904924, incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Sun Jan
31 00:22:29 2010
@@ -32,7 +32,7 @@
  import org.apache.cassandra.locator.AbstractReplicationStrategy;
  import org.apache.cassandra.net.*;
  import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.streaming.Streaming;
+ import org.apache.cassandra.streaming.StreamIn;
  import org.apache.cassandra.utils.SimpleCondition;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.config.DatabaseDescriptor;
@@ -79,7 +79,7 @@
                     InetAddress source = entry.getKey();
                     for (String table : DatabaseDescriptor.getNonSystemTables())
                         StorageService.instance.addBootstrapSource(source, table);
-                    Streaming.requestRanges(source, entry.getValue());
+                    StreamIn.requestRanges(source, entry.getValue());
                 }
             }
         }, "Boostrap requester").start();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Sun Jan 31 00:22:29
2010
@@ -28,7 +28,7 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.Streaming;
+import org.apache.cassandra.streaming.StreamOut;
 
 import org.apache.log4j.Logger;
 
@@ -415,10 +415,10 @@
                 Hashtable<InetAddress, Long> copy = new Hashtable<InetAddress, Long>(justRemovedEndPoints_);
                 for (Map.Entry<InetAddress, Long> entry : copy.entrySet())
                 {
-                    if ((now - entry.getValue()) > Streaming.RING_DELAY)
+                    if ((now - entry.getValue()) > StreamOut.RING_DELAY)
                     {
                         if (logger_.isDebugEnabled())
-                            logger_.debug(Streaming.RING_DELAY + " elapsed, " + entry.getKey()
+ " gossip quarantine over");
+                            logger_.debug(StreamOut.RING_DELAY + " elapsed, " + entry.getKey()
+ " gossip quarantine over");
                         justRemovedEndPoints_.remove(entry.getKey());
                     }
                 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Sun Jan 31 00:22:29 2010
@@ -35,7 +35,7 @@
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.SSTable;
 import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.streaming.Streaming;
+import org.apache.cassandra.streaming.StreamOut;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -628,7 +628,7 @@
             {
                 List<Range> ranges = new ArrayList<Range>(differences);
                 List<SSTableReader> sstables = CompactionManager.instance.submitAnticompaction(cfstore,
ranges, remote).get();
-                Streaming.transferSSTables(remote, sstables, cf.left);
+                StreamOut.transferSSTables(remote, sstables, cf.left);
             }
             catch(Exception e)
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Sun Jan 31 00:22:29 2010
@@ -35,7 +35,7 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.streaming.Streaming;
+import org.apache.cassandra.streaming.StreamOut;
 
 /*
  * The load balancing algorithm here is an implementation of
@@ -365,7 +365,7 @@
                 Thread.sleep(100);
             }
             // one more sleep in case there are some stragglers
-            Thread.sleep(Streaming.RING_DELAY);
+            Thread.sleep(StreamOut.RING_DELAY);
         }
         catch (InterruptedException e)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Sun
Jan 31 00:22:29 2010
@@ -316,10 +316,10 @@
         isBootstrapMode = true;
         SystemTable.updateToken(token); // DON'T use setToken, that makes us part of the
ring locally which is incorrect until we are done bootstrapping
         Gossiper.instance.addApplicationState(MOVE_STATE, new ApplicationState(STATE_BOOTSTRAPPING
+ Delimiter + partitioner_.getTokenFactory().toString(token)));
-        logger_.info("bootstrap sleeping " + Streaming.RING_DELAY);
+        logger_.info("bootstrap sleeping " + StreamOut.RING_DELAY);
         try
         {
-            Thread.sleep(Streaming.RING_DELAY);
+            Thread.sleep(StreamOut.RING_DELAY);
         }
         catch (InterruptedException e)
         {
@@ -708,7 +708,7 @@
 
             // Finally we have a list of addresses and ranges to stream. Proceed to stream
             for (Map.Entry<InetAddress, Collection<Range>> entry : sourceRanges.asMap().entrySet())
-                Streaming.requestRanges(entry.getKey(), entry.getValue());
+                StreamIn.requestRanges(entry.getKey(), entry.getValue());
         }
     }
 
@@ -1265,8 +1265,8 @@
 
         logger_.info("DECOMMISSIONING");
         startLeaving();
-        logger_.info("decommission sleeping " + Streaming.RING_DELAY);
-        Thread.sleep(Streaming.RING_DELAY);
+        logger_.info("decommission sleeping " + StreamOut.RING_DELAY);
+        Thread.sleep(StreamOut.RING_DELAY);
 
         Runnable finishLeaving = new Runnable()
         {
@@ -1336,7 +1336,7 @@
                 public void run()
                 {
                     // TODO each call to transferRanges re-flushes, this is potentially a
lot of waste
-                    Streaming.transferRanges(newEndpoint, Arrays.asList(range), callback);
+                    StreamOut.transferRanges(newEndpoint, Arrays.asList(range), callback);
                 }
             });
         }
@@ -1364,8 +1364,8 @@
 
         logger_.info("starting move. leaving token " + getLocalToken());
         startLeaving();
-        logger_.info("move sleeping " + Streaming.RING_DELAY);
-        Thread.sleep(Streaming.RING_DELAY);
+        logger_.info("move sleeping " + StreamOut.RING_DELAY);
+        Thread.sleep(StreamOut.RING_DELAY);
 
         Runnable finishMoving = new WrappedRunnable()
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
Sun Jan 31 00:22:29 2010
@@ -29,13 +29,13 @@
             switch (streamStatus.getAction())
             {
                 case DELETE:
-                    StreamManager.instance(message.getFrom()).finish(streamStatus.getFile());
+                    StreamManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile());
                     break;
 
                 case STREAM:
                     if (logger.isDebugEnabled())
                         logger.debug("Need to re-stream file " + streamStatus.getFile());
-                    StreamManager.instance(message.getFrom()).repeat();
+                    StreamManager.get(message.getFrom()).startNext();
                     break;
 
                 default:

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=904925&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Sun Jan
31 00:22:29 2010
@@ -0,0 +1,30 @@
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.Collection;
+
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+
+/** for streaming data from other nodes in to this one */
+public class StreamIn
+{
+    private static Logger logger = Logger.getLogger(StreamOut.class);
+
+    /**
+     * Request ranges to be transferred from source to local node
+     */
+    public static void requestRanges(InetAddress source, Collection<Range> ranges)
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges,
", "));
+        StreamRequestMetadata streamRequestMetadata = new StreamRequestMetadata(FBUtilities.getLocalAddress(),
ranges);
+        Message message = StreamRequestMessage.makeStreamRequestMessage(new StreamRequestMessage(streamRequestMetadata));
+        MessagingService.instance.sendOneWay(message, source);
+    }
+}

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
Sun Jan 31 00:22:29 2010
@@ -14,6 +14,6 @@
     {
         if (logger.isDebugEnabled())
           logger.debug("Received a stream initiate done message ...");
-        StreamManager.instance(message.getFrom()).start();
+        StreamManager.get(message.getFrom()).startNext();
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
Sun Jan 31 00:22:29 2010
@@ -48,7 +48,7 @@
                 if (logger.isDebugEnabled())
                     logger.debug("no data needed from " + message.getFrom());
                 if (StorageService.instance.isBootstrapMode())
-                    StorageService.instance.removeBootstrapSource(message.getFrom(), new
String(message.getHeader(Streaming.TABLE_NAME)));
+                    StorageService.instance.removeBootstrapSource(message.getFrom(), new
String(message.getHeader(StreamOut.TABLE_NAME)));
                 return;
             }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java Sun
Jan 31 00:22:29 2010
@@ -31,6 +31,7 @@
 import org.apache.cassandra.streaming.StreamContextManager;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.SimpleCondition;
 
 import org.apache.log4j.Logger;
 
@@ -39,88 +40,82 @@
 */
 public class StreamManager
 {   
-    private static Logger logger_ = Logger.getLogger( StreamManager.class );
+    private static Logger logger = Logger.getLogger( StreamManager.class );
         
-    private static ConcurrentMap<InetAddress, StreamManager> streamManagers_ = new
ConcurrentHashMap<InetAddress, StreamManager>();
-    
-    public static StreamManager instance(InetAddress to)
+    private static ConcurrentMap<InetAddress, StreamManager> streamManagers = new ConcurrentHashMap<InetAddress,
StreamManager>();
+
+    public static StreamManager get(InetAddress to)
     {
-        StreamManager streamManager = streamManagers_.get(to);
-        if ( streamManager == null )
+        StreamManager streamManager = streamManagers.get(to);
+        if (streamManager == null)
         {
             StreamManager possibleNew = new StreamManager(to);
-            if ((streamManager = streamManagers_.putIfAbsent(to, possibleNew)) == null)
+            if ((streamManager = streamManagers.putIfAbsent(to, possibleNew)) == null)
                 streamManager = possibleNew;
         }
         return streamManager;
     }
     
-    private List<File> filesToStream_ = new ArrayList<File>();
-    private InetAddress to_;
-    private long totalBytesToStream_ = 0L;
+    private final List<File> files = new ArrayList<File>();
+    private final InetAddress to;
+    private long totalBytes = 0L;
+    private final SimpleCondition condition = new SimpleCondition();
     
     private StreamManager(InetAddress to)
     {
-        to_ = to;
+        this.to = to;
     }
     
     public void addFilesToStream(StreamContextManager.StreamContext[] streamContexts)
     {
-        for ( StreamContextManager.StreamContext streamContext : streamContexts )
+        for (StreamContextManager.StreamContext streamContext : streamContexts)
         {
-            if (logger_.isDebugEnabled())
-              logger_.debug("Adding file " + streamContext.getTargetFile() + " to be streamed.");
-            filesToStream_.add( new File( streamContext.getTargetFile() ) );
-            totalBytesToStream_ += streamContext.getExpectedBytes();
+            if (logger.isDebugEnabled())
+              logger.debug("Adding file " + streamContext.getTargetFile() + " to be streamed.");
+            files.add( new File( streamContext.getTargetFile() ) );
+            totalBytes += streamContext.getExpectedBytes();
         }
     }
     
-    public void start()
+    public void startNext()
     {
-        if ( filesToStream_.size() > 0 )
+        if (files.size() > 0)
         {
-            File file = filesToStream_.get(0);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Streaming " + file.length() + " length file " + file + " ...");
-            MessagingService.instance.stream(file.getAbsolutePath(), 0L, file.length(), FBUtilities.getLocalAddress(),
to_);
+            File file = files.get(0);
+            if (logger.isDebugEnabled())
+              logger.debug("Streaming " + file.length() + " length file " + file + " ...");
+            MessagingService.instance.stream(file.getAbsolutePath(), 0L, file.length(), FBUtilities.getLocalAddress(),
to);
         }
     }
-    
-    public void repeat()
-    {
-        if ( filesToStream_.size() > 0 )
-            start();
-    }
-    
-    public void finish(String file) throws IOException
+
+    public void finishAndStartNext(String file) throws IOException
     {
         File f = new File(file);
-        if (logger_.isDebugEnabled())
-          logger_.debug("Deleting file " + file + " after streaming " + f.length() + "/"
+ totalBytesToStream_ + " bytes.");
+        if (logger.isDebugEnabled())
+          logger.debug("Deleting file " + file + " after streaming " + f.length() + "/" +
totalBytes + " bytes.");
         FileUtils.delete(file);
-        filesToStream_.remove(0);
-        if ( filesToStream_.size() > 0 )
-            start();
+        files.remove(0);
+        if (files.size() > 0)
+        {
+            startNext();
+        }
         else
         {
-            synchronized(this)
-            {
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Signalling that streaming is done for " + to_);
-                notifyAll();
-            }
+            if (logger.isDebugEnabled())
+              logger.debug("Signalling that streaming is done for " + to);
+            condition.signalAll();
         }
     }
     
-    public synchronized void waitForStreamCompletion()
+    public void waitForStreamCompletion()
     {
         try
         {
-            wait();
+            condition.await();
         }
-        catch (InterruptedException ex)
+        catch (InterruptedException e)
         {
-            throw new AssertionError(ex);
+            throw new AssertionError(e);
         }
     }
 }

Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (from
r904924, incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java&r1=904924&r2=904925&rev=904925&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Sun Jan
31 00:22:29 2010
@@ -42,7 +42,6 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.StreamManager;
-import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * This class handles streaming data from one node to another.
@@ -57,9 +56,10 @@
  * For unbootstrap, the leaving node starts with step 3 (1 and 2 are skipped entirely). 
This is why
  * STREAM_INITIATE is a separate verb, rather than just a reply to STREAM_REQUEST; the REQUEST
is optional.
  */
-public class Streaming
+public class StreamOut
 {
-    private static Logger logger = Logger.getLogger(Streaming.class);
+    private static Logger logger = Logger.getLogger(StreamOut.class);
+
     static String TABLE_NAME = "STREAMING-TABLE-NAME";
     public static final long RING_DELAY = 30 * 1000; // delay after which we assume ring
has stablized
 
@@ -134,10 +134,10 @@
         if (logger.isDebugEnabled())
           logger.debug("Stream context metadata " + StringUtils.join(streamContexts, ", "));
 
-        StreamManager.instance(target).addFilesToStream(streamContexts);
+        StreamManager.get(target).addFilesToStream(streamContexts);
         StreamInitiateMessage biMessage = new StreamInitiateMessage(streamContexts);
         Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
-        message.addHeader(Streaming.TABLE_NAME, table.getBytes());
+        message.addHeader(StreamOut.TABLE_NAME, table.getBytes());
         if (logger.isDebugEnabled())
           logger.debug("Sending a stream initiate message to " + target + " ...");
         MessagingService.instance.sendOneWay(message, target);
@@ -145,22 +145,10 @@
         if (streamContexts.length > 0)
         {
             logger.info("Waiting for transfer to " + target + " to complete");
-            StreamManager.instance(target).waitForStreamCompletion();
+            StreamManager.get(target).waitForStreamCompletion();
             // (StreamManager will delete the streamed file on completion.)
             logger.info("Done with transfer to " + target);
         }
     }
 
-    /**
-     * Request ranges to be transferred
-     */
-    public static void requestRanges(InetAddress source, Collection<Range> ranges)
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges,
", "));
-        StreamRequestMetadata streamRequestMetadata = new StreamRequestMetadata(FBUtilities.getLocalAddress(),
ranges);
-        Message message = StreamRequestMessage.makeStreamRequestMessage(new StreamRequestMessage(streamRequestMetadata));
-        MessagingService.instance.sendOneWay(message, source);
-    }
-
 }

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
Sun Jan 31 00:22:29 2010
@@ -52,7 +52,7 @@
             {
                 if (logger_.isDebugEnabled())
                     logger_.debug(srm.toString());
-                Streaming.transferRanges(srm.target_, srm.ranges_, null);
+                StreamOut.transferRanges(srm.target_, srm.ranges_, null);
             }
         }
         catch (IOException ex)

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java Sun Jan
31 00:22:29 2010
@@ -31,7 +31,7 @@
 import org.apache.cassandra.io.SSTableUtils;
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.Streaming;
+import org.apache.cassandra.streaming.StreamOut;
 import org.apache.cassandra.utils.FBUtilities;
 
 import org.junit.Test;
@@ -54,7 +54,7 @@
         String cfname = sstable.getColumnFamilyName();
 
         // transfer
-        Streaming.transferSSTables(LOCAL, Arrays.asList(sstable), tablename);
+        StreamOut.transferSSTables(LOCAL, Arrays.asList(sstable), tablename);
 
         // confirm that the SSTable was transferred and registered
         ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname);



Mime
View raw message