Return-Path: Delivered-To: apmail-incubator-cassandra-commits-archive@minotaur.apache.org Received: (qmail 26547 invoked from network); 31 Jan 2010 00:23:00 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 31 Jan 2010 00:23:00 -0000 Received: (qmail 92903 invoked by uid 500); 31 Jan 2010 00:23:00 -0000 Delivered-To: apmail-incubator-cassandra-commits-archive@incubator.apache.org Received: (qmail 92884 invoked by uid 500); 31 Jan 2010 00:23:00 -0000 Mailing-List: contact cassandra-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cassandra-dev@incubator.apache.org Delivered-To: mailing list cassandra-commits@incubator.apache.org Received: (qmail 92798 invoked by uid 99); 31 Jan 2010 00:23:00 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 31 Jan 2010 00:23:00 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 31 Jan 2010 00:22:50 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8A0F723888FE; Sun, 31 Jan 2010 00:22:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r904925 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/io/ Date: Sun, 31 Jan 2010 00:22:30 -0000 To: cassandra-commits@incubator.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100131002230.8A0F723888FE@eris.apache.org> Author: jbellis Date: Sun Jan 31 00:22:29 2010 New Revision: 904925 URL: http://svn.apache.org/viewvc?rev=904925&view=rev Log: split Streaming into StreamOut and StreamIn; clean up StreamManager patch by jbellis; reviewed by stuhood for CASSANDRA-751 Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (with props) incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (contents, props changed) - copied, changed from r904924, incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java Removed: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=904925&r1=904924&r2=904925&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Sun Jan 31 00:22:29 2010 @@ -32,7 +32,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.net.*; import org.apache.cassandra.service.StorageService; - import org.apache.cassandra.streaming.Streaming; + import org.apache.cassandra.streaming.StreamIn; import org.apache.cassandra.utils.SimpleCondition; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.config.DatabaseDescriptor; @@ -79,7 +79,7 @@ InetAddress source = entry.getKey(); for (String table : DatabaseDescriptor.getNonSystemTables()) StorageService.instance.addBootstrapSource(source, table); - Streaming.requestRanges(source, entry.getValue()); + StreamIn.requestRanges(source, entry.getValue()); } } }, "Boostrap requester").start(); Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=904925&r1=904924&r2=904925&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Sun Jan 31 00:22:29 2010 @@ -28,7 +28,7 @@ import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.streaming.Streaming; +import org.apache.cassandra.streaming.StreamOut; import org.apache.log4j.Logger; @@ -415,10 +415,10 @@ Hashtable copy = new Hashtable(justRemovedEndPoints_); for (Map.Entry entry : copy.entrySet()) { - if ((now - entry.getValue()) > Streaming.RING_DELAY) + if ((now - entry.getValue()) > StreamOut.RING_DELAY) { if (logger_.isDebugEnabled()) - logger_.debug(Streaming.RING_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over"); + logger_.debug(StreamOut.RING_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over"); justRemovedEndPoints_.remove(entry.getKey()); } } Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=904925&r1=904924&r2=904925&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Sun Jan 31 00:22:29 2010 @@ -35,7 +35,7 @@ import org.apache.cassandra.io.ICompactSerializer; import org.apache.cassandra.io.SSTable; import org.apache.cassandra.io.SSTableReader; -import org.apache.cassandra.streaming.Streaming; +import org.apache.cassandra.streaming.StreamOut; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -628,7 +628,7 @@ { List ranges = new ArrayList(differences); List sstables = CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get(); - Streaming.transferSSTables(remote, sstables, cf.left); + StreamOut.transferSSTables(remote, sstables, cf.left); } catch(Exception e) { Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=904925&r1=904924&r2=904925&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Sun Jan 31 00:22:29 2010 @@ -35,7 +35,7 @@ import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.streaming.Streaming; +import org.apache.cassandra.streaming.StreamOut; /* * The load balancing algorithm here is an implementation of @@ -365,7 +365,7 @@ Thread.sleep(100); } // one more sleep in case there are some stragglers - Thread.sleep(Streaming.RING_DELAY); + Thread.sleep(StreamOut.RING_DELAY); } catch (InterruptedException e) { Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=904925&r1=904924&r2=904925&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Sun Jan 31 00:22:29 2010 @@ -316,10 +316,10 @@ isBootstrapMode = true; SystemTable.updateToken(token); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping Gossiper.instance.addApplicationState(MOVE_STATE, new ApplicationState(STATE_BOOTSTRAPPING + Delimiter + partitioner_.getTokenFactory().toString(token))); - logger_.info("bootstrap sleeping " + Streaming.RING_DELAY); + logger_.info("bootstrap sleeping " + StreamOut.RING_DELAY); try { - Thread.sleep(Streaming.RING_DELAY); + Thread.sleep(StreamOut.RING_DELAY); } catch (InterruptedException e) { @@ -708,7 +708,7 @@ // Finally we have a list of addresses and ranges to stream. Proceed to stream for (Map.Entry> entry : sourceRanges.asMap().entrySet()) - Streaming.requestRanges(entry.getKey(), entry.getValue()); + StreamIn.requestRanges(entry.getKey(), entry.getValue()); } } @@ -1265,8 +1265,8 @@ logger_.info("DECOMMISSIONING"); startLeaving(); - logger_.info("decommission sleeping " + Streaming.RING_DELAY); - Thread.sleep(Streaming.RING_DELAY); + logger_.info("decommission sleeping " + StreamOut.RING_DELAY); + Thread.sleep(StreamOut.RING_DELAY); Runnable finishLeaving = new Runnable() { @@ -1336,7 +1336,7 @@ public void run() { // TODO each call to transferRanges re-flushes, this is potentially a lot of waste - Streaming.transferRanges(newEndpoint, Arrays.asList(range), callback); + StreamOut.transferRanges(newEndpoint, Arrays.asList(range), callback); } }); } @@ -1364,8 +1364,8 @@ logger_.info("starting move. leaving token " + getLocalToken()); startLeaving(); - logger_.info("move sleeping " + Streaming.RING_DELAY); - Thread.sleep(Streaming.RING_DELAY); + logger_.info("move sleeping " + StreamOut.RING_DELAY); + Thread.sleep(StreamOut.RING_DELAY); Runnable finishMoving = new WrappedRunnable() { Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=904925&r1=904924&r2=904925&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java Sun Jan 31 00:22:29 2010 @@ -29,13 +29,13 @@ switch (streamStatus.getAction()) { case DELETE: - StreamManager.instance(message.getFrom()).finish(streamStatus.getFile()); + StreamManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile()); break; case STREAM: if (logger.isDebugEnabled()) logger.debug("Need to re-stream file " + streamStatus.getFile()); - StreamManager.instance(message.getFrom()).repeat(); + StreamManager.get(message.getFrom()).startNext(); break; default: Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=904925&view=auto ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (added) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Sun Jan 31 00:22:29 2010 @@ -0,0 +1,30 @@ +package org.apache.cassandra.streaming; + +import java.net.InetAddress; +import java.util.Collection; + +import org.apache.log4j.Logger; +import org.apache.commons.lang.StringUtils; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.FBUtilities; + +/** for streaming data from other nodes in to this one */ +public class StreamIn +{ + private static Logger logger = Logger.getLogger(StreamOut.class); + + /** + * Request ranges to be transferred from source to local node + */ + public static void requestRanges(InetAddress source, Collection ranges) + { + if (logger.isDebugEnabled()) + logger.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", ")); + StreamRequestMetadata streamRequestMetadata = new StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges); + Message message = StreamRequestMessage.makeStreamRequestMessage(new StreamRequestMessage(streamRequestMetadata)); + MessagingService.instance.sendOneWay(message, source); + } +} Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java?rev=904925&r1=904924&r2=904925&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java Sun Jan 31 00:22:29 2010 @@ -14,6 +14,6 @@ { if (logger.isDebugEnabled()) logger.debug("Received a stream initiate done message ..."); - StreamManager.instance(message.getFrom()).start(); + StreamManager.get(message.getFrom()).startNext(); } } Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=904925&r1=904924&r2=904925&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java Sun Jan 31 00:22:29 2010 @@ -48,7 +48,7 @@ if (logger.isDebugEnabled()) logger.debug("no data needed from " + message.getFrom()); if (StorageService.instance.isBootstrapMode()) - StorageService.instance.removeBootstrapSource(message.getFrom(), new String(message.getHeader(Streaming.TABLE_NAME))); + StorageService.instance.removeBootstrapSource(message.getFrom(), new String(message.getHeader(StreamOut.TABLE_NAME))); return; } Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java?rev=904925&r1=904924&r2=904925&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java Sun Jan 31 00:22:29 2010 @@ -31,6 +31,7 @@ import org.apache.cassandra.streaming.StreamContextManager; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.SimpleCondition; import org.apache.log4j.Logger; @@ -39,88 +40,82 @@ */ public class StreamManager { - private static Logger logger_ = Logger.getLogger( StreamManager.class ); + private static Logger logger = Logger.getLogger( StreamManager.class ); - private static ConcurrentMap streamManagers_ = new ConcurrentHashMap(); - - public static StreamManager instance(InetAddress to) + private static ConcurrentMap streamManagers = new ConcurrentHashMap(); + + public static StreamManager get(InetAddress to) { - StreamManager streamManager = streamManagers_.get(to); - if ( streamManager == null ) + StreamManager streamManager = streamManagers.get(to); + if (streamManager == null) { StreamManager possibleNew = new StreamManager(to); - if ((streamManager = streamManagers_.putIfAbsent(to, possibleNew)) == null) + if ((streamManager = streamManagers.putIfAbsent(to, possibleNew)) == null) streamManager = possibleNew; } return streamManager; } - private List filesToStream_ = new ArrayList(); - private InetAddress to_; - private long totalBytesToStream_ = 0L; + private final List files = new ArrayList(); + private final InetAddress to; + private long totalBytes = 0L; + private final SimpleCondition condition = new SimpleCondition(); private StreamManager(InetAddress to) { - to_ = to; + this.to = to; } public void addFilesToStream(StreamContextManager.StreamContext[] streamContexts) { - for ( StreamContextManager.StreamContext streamContext : streamContexts ) + for (StreamContextManager.StreamContext streamContext : streamContexts) { - if (logger_.isDebugEnabled()) - logger_.debug("Adding file " + streamContext.getTargetFile() + " to be streamed."); - filesToStream_.add( new File( streamContext.getTargetFile() ) ); - totalBytesToStream_ += streamContext.getExpectedBytes(); + if (logger.isDebugEnabled()) + logger.debug("Adding file " + streamContext.getTargetFile() + " to be streamed."); + files.add( new File( streamContext.getTargetFile() ) ); + totalBytes += streamContext.getExpectedBytes(); } } - public void start() + public void startNext() { - if ( filesToStream_.size() > 0 ) + if (files.size() > 0) { - File file = filesToStream_.get(0); - if (logger_.isDebugEnabled()) - logger_.debug("Streaming " + file.length() + " length file " + file + " ..."); - MessagingService.instance.stream(file.getAbsolutePath(), 0L, file.length(), FBUtilities.getLocalAddress(), to_); + File file = files.get(0); + if (logger.isDebugEnabled()) + logger.debug("Streaming " + file.length() + " length file " + file + " ..."); + MessagingService.instance.stream(file.getAbsolutePath(), 0L, file.length(), FBUtilities.getLocalAddress(), to); } } - - public void repeat() - { - if ( filesToStream_.size() > 0 ) - start(); - } - - public void finish(String file) throws IOException + + public void finishAndStartNext(String file) throws IOException { File f = new File(file); - if (logger_.isDebugEnabled()) - logger_.debug("Deleting file " + file + " after streaming " + f.length() + "/" + totalBytesToStream_ + " bytes."); + if (logger.isDebugEnabled()) + logger.debug("Deleting file " + file + " after streaming " + f.length() + "/" + totalBytes + " bytes."); FileUtils.delete(file); - filesToStream_.remove(0); - if ( filesToStream_.size() > 0 ) - start(); + files.remove(0); + if (files.size() > 0) + { + startNext(); + } else { - synchronized(this) - { - if (logger_.isDebugEnabled()) - logger_.debug("Signalling that streaming is done for " + to_); - notifyAll(); - } + if (logger.isDebugEnabled()) + logger.debug("Signalling that streaming is done for " + to); + condition.signalAll(); } } - public synchronized void waitForStreamCompletion() + public void waitForStreamCompletion() { try { - wait(); + condition.await(); } - catch (InterruptedException ex) + catch (InterruptedException e) { - throw new AssertionError(ex); + throw new AssertionError(e); } } } Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (from r904924, incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java) URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java&r1=904924&r2=904925&rev=904925&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Sun Jan 31 00:22:29 2010 @@ -42,7 +42,6 @@ import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.StreamManager; -import org.apache.cassandra.utils.FBUtilities; /** * This class handles streaming data from one node to another. @@ -57,9 +56,10 @@ * For unbootstrap, the leaving node starts with step 3 (1 and 2 are skipped entirely). This is why * STREAM_INITIATE is a separate verb, rather than just a reply to STREAM_REQUEST; the REQUEST is optional. */ -public class Streaming +public class StreamOut { - private static Logger logger = Logger.getLogger(Streaming.class); + private static Logger logger = Logger.getLogger(StreamOut.class); + static String TABLE_NAME = "STREAMING-TABLE-NAME"; public static final long RING_DELAY = 30 * 1000; // delay after which we assume ring has stablized @@ -134,10 +134,10 @@ if (logger.isDebugEnabled()) logger.debug("Stream context metadata " + StringUtils.join(streamContexts, ", ")); - StreamManager.instance(target).addFilesToStream(streamContexts); + StreamManager.get(target).addFilesToStream(streamContexts); StreamInitiateMessage biMessage = new StreamInitiateMessage(streamContexts); Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage); - message.addHeader(Streaming.TABLE_NAME, table.getBytes()); + message.addHeader(StreamOut.TABLE_NAME, table.getBytes()); if (logger.isDebugEnabled()) logger.debug("Sending a stream initiate message to " + target + " ..."); MessagingService.instance.sendOneWay(message, target); @@ -145,22 +145,10 @@ if (streamContexts.length > 0) { logger.info("Waiting for transfer to " + target + " to complete"); - StreamManager.instance(target).waitForStreamCompletion(); + StreamManager.get(target).waitForStreamCompletion(); // (StreamManager will delete the streamed file on completion.) logger.info("Done with transfer to " + target); } } - /** - * Request ranges to be transferred - */ - public static void requestRanges(InetAddress source, Collection ranges) - { - if (logger.isDebugEnabled()) - logger.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", ")); - StreamRequestMetadata streamRequestMetadata = new StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges); - Message message = StreamRequestMessage.makeStreamRequestMessage(new StreamRequestMessage(streamRequestMetadata)); - MessagingService.instance.sendOneWay(message, source); - } - } Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=904925&r1=904924&r2=904925&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java Sun Jan 31 00:22:29 2010 @@ -52,7 +52,7 @@ { if (logger_.isDebugEnabled()) logger_.debug(srm.toString()); - Streaming.transferRanges(srm.target_, srm.ranges_, null); + StreamOut.transferRanges(srm.target_, srm.ranges_, null); } } catch (IOException ex) Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java?rev=904925&r1=904924&r2=904925&view=diff ============================================================================== --- incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java (original) +++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java Sun Jan 31 00:22:29 2010 @@ -31,7 +31,7 @@ import org.apache.cassandra.io.SSTableUtils; import org.apache.cassandra.io.SSTableReader; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.streaming.Streaming; +import org.apache.cassandra.streaming.StreamOut; import org.apache.cassandra.utils.FBUtilities; import org.junit.Test; @@ -54,7 +54,7 @@ String cfname = sstable.getColumnFamilyName(); // transfer - Streaming.transferSSTables(LOCAL, Arrays.asList(sstable), tablename); + StreamOut.transferSSTables(LOCAL, Arrays.asList(sstable), tablename); // confirm that the SSTable was transferred and registered ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname);