cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r997119 - in /cassandra/trunk: src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/streaming/
Date Tue, 14 Sep 2010 22:25:53 GMT
Author: jbellis
Date: Tue Sep 14 22:25:53 2010
New Revision: 997119

URL: http://svn.apache.org/viewvc?rev=997119&view=rev
Log:
avoid exposing StreamContext outside the Session managers, and make the Sessions the handle
for streaming work.  also removes unncessary extra Collections from the session objects.
patch by jbellis; reviewed by Nick Bailey for CASSANDRA-1504

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Tue Sep
14 22:25:53 2010
@@ -41,7 +41,6 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.AbstractCompactedRow;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.streaming.StreamContext;
 import org.apache.cassandra.streaming.StreamIn;
 import org.apache.cassandra.streaming.StreamOut;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
@@ -538,9 +537,9 @@ public class AntiEntropyService
                 {
                     protected void runMayThrow() throws Exception
                     {
-                        StreamContext context = new StreamContext(request.endpoint);
-                        StreamOut.transferSSTables(context, request.cf.left, sstables, ranges);
-                        StreamOutSession.remove(context);
+                        StreamOutSession session = StreamOutSession.create(request.endpoint);
+                        StreamOut.transferSSTables(session, request.cf.left, sstables, ranges);
+                        session.close();
                     }
                 });
                 // request ranges from the remote node

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java Tue Sep
14 22:25:53 2010
@@ -38,13 +38,13 @@ class FileStatusHandler
 {
     private static Logger logger = LoggerFactory.getLogger(FileStatusHandler.class);
 
-    public static void onStatusChange(StreamContext context, PendingFile pendingFile, FileStatus
streamStatus) throws IOException
+    public static void onStatusChange(StreamInSession session, PendingFile pendingFile, FileStatus
streamStatus) throws IOException
     {
         if (FileStatus.Action.STREAM == streamStatus.getAction())
         {
             // file needs to be restreamed
-            logger.warn("Streaming of file {} from {} failed: requesting a retry.", pendingFile,
context);
-            MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(),
context.host);
+            logger.warn("Streaming of file {} from {} failed: requesting a retry.", pendingFile,
session);
+            MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(),
session.getHost());
             return;
         }
         assert FileStatus.Action.DELETE == streamStatus.getAction() :
@@ -54,8 +54,8 @@ class FileStatusHandler
 
         // send a StreamStatus message telling the source node it can delete this file
         if (logger.isDebugEnabled())
-            logger.debug("Sending a streaming finished message for {} to {}", pendingFile,
context);
-        MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), context.host);
+            logger.debug("Sending a streaming finished message for {} to {}", pendingFile,
session);
+        MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), session.getHost());
     }
 
     public static void addSSTable(PendingFile pendingFile)

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Tue
Sep 14 22:25:53 2010
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 import java.io.*;
+import java.util.Arrays;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,14 +32,15 @@ import org.apache.cassandra.utils.Pair;
 
 public class IncomingStreamReader
 {
-    private static Logger logger = LoggerFactory.getLogger(IncomingStreamReader.class);
+    private static final Logger logger = LoggerFactory.getLogger(IncomingStreamReader.class);
+
     private PendingFile pendingFile;
     private PendingFile lastFile;
     private FileStatus streamStatus;
-    private SocketChannel socketChannel;
-    private StreamContext context;
+    private final SocketChannel socketChannel;
     // indicates an transfer initiated by the source, as opposed to one requested by the
recipient
-    private boolean initiatedTransfer;
+    private final boolean initiatedTransfer;
+    private final StreamInSession session;
 
     public IncomingStreamReader(StreamHeader header, SocketChannel socketChannel) throws
IOException
     {
@@ -49,15 +51,15 @@ public class IncomingStreamReader
         // lastFile has the old context, which was registered in the manager.
         lastFile = header.getStreamFile();
         initiatedTransfer = header.initiatedTransfer;
-        context = new StreamContext(remoteAddress.getAddress(), header.getSessionId());
-        StreamInSession.activeStreams.put(context, pendingFile);
         assert pendingFile != null;
+        session = StreamInSession.get(remoteAddress.getAddress(), header.getSessionId());
+        session.addActiveStream(pendingFile);
         // For transfers setup the status and for replies to requests, prepare the list
         // of available files to request.
         if (initiatedTransfer)
             streamStatus = new FileStatus(lastFile.getFilename(), header.getSessionId());
         else if (header.getPendingFiles() != null)
-            StreamInSession.get(context).addFilesToRequest(header.getPendingFiles());
+            session.addFilesToRequest(header.getPendingFiles());
     }
 
     public void read() throws IOException
@@ -89,7 +91,7 @@ public class IncomingStreamReader
             if (initiatedTransfer)
                 handleFileStatus(FileStatus.Action.STREAM);
             else
-                StreamIn.requestFile(context, lastFile);
+                session.requestFile(lastFile);
 
             /* Delete the orphaned file. */
             FileUtils.deleteWithConfirm(new File(pendingFile.getFilename()));
@@ -98,7 +100,7 @@ public class IncomingStreamReader
         finally
         {
             fc.close();
-            StreamInSession.activeStreams.remove(context, pendingFile);
+            session.removeActiveStream(pendingFile);
         }
 
         if (logger.isDebugEnabled())
@@ -108,13 +110,13 @@ public class IncomingStreamReader
         else
         {
             FileStatusHandler.addSSTable(pendingFile);
-            StreamInSession.get(context).finishAndRequestNext(lastFile);
+            session.finishAndRequestNext(lastFile);
         }
     }
 
     private void handleFileStatus(FileStatus.Action action) throws IOException
     {
         streamStatus.setAction(action);
-        FileStatusHandler.onStatusChange(context, pendingFile, streamStatus);
+        FileStatusHandler.onStatusChange(session, pendingFile, streamStatus);
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Tue Sep 14 22:25:53
2010
@@ -51,24 +51,11 @@ public class StreamIn
 
         if (logger.isDebugEnabled())
             logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges,
", "));
-        StreamContext context = new StreamContext(source);
-        StreamInSession.get(context);
-        Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges,
tableName, context.sessionId).makeMessage();
+        StreamInSession session = StreamInSession.create(source);
+        Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges,
tableName, session.getSessionId()).makeMessage();
         MessagingService.instance.sendOneWay(message, source);
     }
 
-    /**
-     * Request for transferring a single file. This happens subsequent of #requestRanges()
being called.
-     * @param file Pending File that needs to be transferred
-     */
-    public static void requestFile(StreamContext context, PendingFile file)
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("Requesting file {} from source {}", file.getFilename(), context.host);
-        Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(), file, context.sessionId).makeMessage();
-        MessagingService.instance.sendOneWay(message, context.host);
-    }
-
     /** Translates remote files to local files by creating a local sstable per remote sstable.
*/
     public static PendingFile getContextMapping(PendingFile remote) throws IOException
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Tue Sep 14
22:25:53 2010
@@ -23,24 +23,23 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** each context gets its own StreamInSession. So there may be >1 StreamInManager per
host */
+/** each context gets its own StreamInSession. So there may be >1 Session per host */
 public class StreamInSession
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamInSession.class);
 
-    private static ConcurrentMap<StreamContext, StreamInSession> streamManagers = new
ConcurrentHashMap<StreamContext, StreamInSession>(0);
-    public static final Multimap<StreamContext, PendingFile> activeStreams = Multimaps.synchronizedMultimap(HashMultimap.<StreamContext,
PendingFile>create());
-    public static final Multimap<InetAddress, StreamContext> sourceHosts = Multimaps.synchronizedMultimap(HashMultimap.<InetAddress,
StreamContext>create());
-    
+    private static ConcurrentMap<StreamContext, StreamInSession> sessions = new NonBlockingHashMap<StreamContext,
StreamInSession>();
+    private final Set<PendingFile> activeStreams = new HashSet<PendingFile>();
+
     private final List<PendingFile> pendingFiles = new ArrayList<PendingFile>();
     private final StreamContext context;
 
@@ -49,24 +48,44 @@ public class StreamInSession
         this.context = context;
     }
 
-    public synchronized static StreamInSession get(StreamContext context)
+    public static StreamInSession create(InetAddress host)
+    {
+        StreamContext context = new StreamContext(host);
+        StreamInSession session = new StreamInSession(context);
+        sessions.put(context, session);
+        return session;
+    }
+
+    public static StreamInSession get(InetAddress host, long sessionId)
     {
-        StreamInSession session = streamManagers.get(context);
+        StreamContext context = new StreamContext(host, sessionId);
+
+        StreamInSession session = sessions.get(context);
         if (session == null)
         {
             StreamInSession possibleNew = new StreamInSession(context);
-            if ((session = streamManagers.putIfAbsent(context, possibleNew)) == null)
+            if ((session = sessions.putIfAbsent(context, possibleNew)) == null)
             {
                 session = possibleNew;
-                sourceHosts.put(context.host, context);
             }
         }
         return session;
     }
 
-    public void addFilesToRequest(List<PendingFile> pendingFiles)
+    // FIXME hack for "initiated" streams.  replace w/ integration w/ pendingfiles
+    public void addActiveStream(PendingFile file)
+    {
+        activeStreams.add(file);
+    }
+
+    public void removeActiveStream(PendingFile file)
+    {
+        activeStreams.remove(file);
+    }
+
+    public void addFilesToRequest(List<PendingFile> files)
     {
-        for(PendingFile file : pendingFiles)
+        for(PendingFile file : files)
         {
             if(logger.isDebugEnabled())
                 logger.debug("Adding file {} to Stream Request queue", file.getFilename());
@@ -82,41 +101,61 @@ public class StreamInSession
     {
         pendingFiles.remove(lastFile);
         if (pendingFiles.size() > 0)
-            StreamIn.requestFile(context, pendingFiles.get(0));
+            requestFile(pendingFiles.get(0));
         else
         {
             if (StorageService.instance.isBootstrapMode())
-                StorageService.instance.removeBootstrapSource(context.host, lastFile.desc.ksname);
+                StorageService.instance.removeBootstrapSource(getHost(), lastFile.desc.ksname);
             remove();
         }
     }
     
     public void remove()
     {
-        if (streamManagers.containsKey(context))
-            streamManagers.remove(context);
-        sourceHosts.remove(context.host, context);
+        sessions.remove(context);
+    }
+
+    public void requestFile(PendingFile file)
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("Requesting file {} from source {}", file.getFilename(), getHost());
+        Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(), file, getSessionId()).makeMessage();
+        MessagingService.instance.sendOneWay(message, getHost());
+    }
+
+    public long getSessionId()
+    {
+        return context.sessionId;
+    }
+
+    public InetAddress getHost()
+    {
+        return context.host;
     }
 
     /** query method to determine which hosts are streaming to this node. */
-    public static Set<StreamContext> getSources()
+    public static Set<InetAddress> getSources()
     {
-        HashSet<StreamContext> set = new HashSet<StreamContext>();
-        set.addAll(streamManagers.keySet());
-        set.addAll(activeStreams.keySet());
+        HashSet<InetAddress> set = new HashSet<InetAddress>();
+        for (StreamInSession session : sessions.values())
+        {
+            set.add(session.getHost());
+        }
         return set;
     }
 
     /** query the status of incoming files. */
     public static List<PendingFile> getIncomingFiles(InetAddress host)
     {
-        // avoid returning null.
         List<PendingFile> list = new ArrayList<PendingFile>();
-        for (StreamContext context : sourceHosts.get(host))
+        for (Map.Entry<StreamContext, StreamInSession> entry : sessions.entrySet())
         {
-            if (streamManagers.containsKey(context))
-                list.addAll(streamManagers.get(context).pendingFiles);
-            list.addAll(activeStreams.get(context));
+            if (entry.getKey().host.equals(host))
+            {
+                StreamInSession session = entry.getValue();
+                list.addAll(session.pendingFiles);
+                list.addAll(session.activeStreams);
+            }
         }
         return list;
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Tue Sep 14 22:25:53
2010
@@ -66,17 +66,16 @@ public class StreamOut
     {
         assert ranges.size() > 0;
         
-        StreamContext context = new StreamContext(target);
         // this is so that this target shows up as a destination while anticompaction is
happening.
-        StreamOutSession.get(context);
+        StreamOutSession session = StreamOutSession.create(target);
 
-        logger.info("Beginning transfer process to {} for ranges {}", context, StringUtils.join(ranges,
", "));
+        logger.info("Beginning transfer process to {} for ranges {}", target, StringUtils.join(ranges,
", "));
 
         try
         {
             Table table = flushSSTable(tableName);
             // send the matching portion of every sstable in the keyspace
-            transferSSTables(context, tableName, table.getAllSSTables(), ranges);
+            transferSSTables(session, tableName, table.getAllSSTables(), ranges);
         }
         catch (IOException e)
         {
@@ -84,7 +83,7 @@ public class StreamOut
         }
         finally
         {
-            StreamOutSession.remove(context);
+            session.close();
         }
         if (callback != null)
             callback.run();
@@ -120,17 +119,17 @@ public class StreamOut
     /**
      * Split out files for all tables on disk locally for each range and then stream them
to the target endpoint.
     */
-    public static void transferRangesForRequest(StreamContext context, String tableName,
Collection<Range> ranges, Runnable callback)
+    public static void transferRangesForRequest(StreamOutSession session, String tableName,
Collection<Range> ranges, Runnable callback)
     {
         assert ranges.size() > 0;
 
-        logger.info("Beginning transfer process to {} for ranges {}", context, StringUtils.join(ranges,
", "));
+        logger.info("Beginning transfer process to {} for ranges {}", session.getHost(),
StringUtils.join(ranges, ", "));
 
         try
         {
             Table table = flushSSTable(tableName);
             // send the matching portion of every sstable in the keyspace
-            transferSSTablesForRequest(context, tableName, table.getAllSSTables(), ranges);
+            transferSSTablesForRequest(session, tableName, table.getAllSSTables(), ranges);
         }
         catch (IOException e)
         {
@@ -144,21 +143,21 @@ public class StreamOut
     /**
      * Transfers matching portions of a group of sstables from a single table to the target
endpoint.
      */
-    public static void transferSSTables(StreamContext context, String table, Collection<SSTableReader>
sstables, Collection<Range> ranges) throws IOException
+    public static void transferSSTables(StreamOutSession session, String table, Collection<SSTableReader>
sstables, Collection<Range> ranges) throws IOException
     {
         List<PendingFile> pending = createPendingFiles(sstables, ranges);
 
         if (pending.size() > 0)
         {
-            StreamHeader header = new StreamHeader(context.sessionId, pending.get(0), true);
-            StreamOutSession.get(context).addFilesToStream(pending);
+            StreamHeader header = new StreamHeader(session.getSessionId(), pending.get(0),
true);
+            session.addFilesToStream(pending);
 
-            logger.info("Streaming file {} to {}", header.getStreamFile(), context.host);
-            MessagingService.instance.stream(header, context.host);
+            logger.info("Streaming file {} to {}", header.getStreamFile(), session.getHost());
+            MessagingService.instance.stream(header, session.getHost());
 
-            logger.info("Waiting for transfer to {} to complete", context);
-            StreamOutSession.get(context).waitForStreamCompletion();
-            logger.info("Done with transfer to {}", context);
+            logger.info("Waiting for transfer to {} to complete", session.getHost());
+            session.waitForStreamCompletion();
+            logger.info("Done with transfer to {}", session.getHost());
         }
     }
 
@@ -166,27 +165,27 @@ public class StreamOut
      * Transfers the first file for matching portions of a group of sstables and appends
a list of other files
      * to the header for the requesting destination to take control of the rest of the transfers
      */
-    private static void transferSSTablesForRequest(StreamContext context, String table, Collection<SSTableReader>
sstables, Collection<Range> ranges) throws IOException
+    private static void transferSSTablesForRequest(StreamOutSession session, String table,
Collection<SSTableReader> sstables, Collection<Range> ranges) throws IOException
     {
         List<PendingFile> pending = createPendingFiles(sstables, ranges);
         if (pending.size() > 0)
         {
-            StreamHeader header = new StreamHeader(context.sessionId, pending.get(0), pending,
false);
+            StreamHeader header = new StreamHeader(session.getSessionId(), pending.get(0),
pending, false);
             // In case this happens to be a re-request due to some error condition on the
destination side
-            if (StreamOutSession.getPendingFiles(context).size() == 0)
-                StreamOutSession.get(context).addFilesToStream(pending);
+            if (session.getFiles().isEmpty())
+                session.addFilesToStream(pending);
 
-            logger.info("Streaming file {} to {}", header.getStreamFile(), context.host);
-            MessagingService.instance.stream(header, context.host);
-            StreamOutSession.get(context).removePending(header.getStreamFile());
+            logger.info("Streaming file {} to {}", header.getStreamFile(), session.getHost());
+            MessagingService.instance.stream(header, session.getHost());
+            session.removePending(header.getStreamFile());
         }
         else
         {
-            FileStatus status = new FileStatus("", context.sessionId);
+            FileStatus status = new FileStatus("", session.getSessionId());
             status.setAction(FileStatus.Action.EMPTY);
             Message message = status.makeStreamStatusMessage();
             message.setHeader(StreamOut.TABLE_NAME, table.getBytes());
-            MessagingService.instance.sendOneWay(message, context.host);
+            MessagingService.instance.sendOneWay(message, session.getHost());
         }
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java Tue Sep
14 22:25:53 2010
@@ -20,90 +20,47 @@ package org.apache.cassandra.streaming;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.SimpleCondition;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.SimpleCondition;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 /**
  * This class manages the streaming of multiple files one after the other.
 */
 public class StreamOutSession
 {   
-    private static Logger logger = LoggerFactory.getLogger( StreamOutSession.class );
+    private static final Logger logger = LoggerFactory.getLogger( StreamOutSession.class
);
         
-    // one host may have multiple stream contexts (think of them as sessions). each context
gets its own manager.
-    private static ConcurrentMap<StreamContext, StreamOutSession> streamManagers =
new ConcurrentHashMap<StreamContext, StreamOutSession>();
-    public static final Multimap<InetAddress, StreamContext> destHosts = Multimaps.synchronizedMultimap(HashMultimap.<InetAddress,
StreamContext>create());
-
-    public static StreamOutSession get(StreamContext context)
-    {
-        StreamOutSession session = streamManagers.get(context);
-        if (session == null)
-        {
-            StreamOutSession possibleNew = new StreamOutSession(context);
-            if ((session = streamManagers.putIfAbsent(context, possibleNew)) == null)
-            {
-                session = possibleNew;
-                destHosts.put(context.host, context);
-            }
-        }
-        return session;
-    }
-    
-    public static void remove(StreamContext context)
+    // one host may have multiple stream sessions.
+    private static final ConcurrentMap<StreamContext, StreamOutSession> streams = new
NonBlockingHashMap<StreamContext, StreamOutSession>();
+
+    public static StreamOutSession create(InetAddress host)
     {
-        if (streamManagers.containsKey(context) && streamManagers.get(context).files.size()
== 0)
-        {
-            streamManagers.remove(context);
-            destHosts.remove(context.host, context);
-        }
+        return create(host, System.nanoTime());
     }
 
-    public static Set<InetAddress> getDestinations()
+    public static StreamOutSession create(InetAddress host, long sessionId)
     {
-        // the results of streamManagers.keySet() isn't serializable, so create a new set.
-        Set<InetAddress> hosts = new HashSet<InetAddress>();
-        hosts.addAll(destHosts.keySet());
-        return hosts;
+        StreamContext context = new StreamContext(host, sessionId);
+        StreamOutSession session = new StreamOutSession(context);
+        streams.put(context, session);
+        return session;
     }
-    
-    /** 
-     * this method exists so that we don't have to call StreamOutManager.get() which has
a nasty side-effect of 
-     * indicating that we are streaming to a particular host.
-     **/     
-    public static List<PendingFile> getPendingFiles(StreamContext context)
+
+    public static StreamOutSession get(InetAddress host, long sessionId)
     {
-        List<PendingFile> list = new ArrayList<PendingFile>();
-        StreamOutSession session = streamManagers.get(context);
-        if (session != null)
-            list.addAll(session.getFiles());
-        return list;
+        return streams.get(new StreamContext(host, sessionId));
     }
 
-    public static List<PendingFile> getOutgoingFiles(InetAddress host)
+    public void close()
     {
-        List<PendingFile> list = new ArrayList<PendingFile>();
-        for(StreamContext context : destHosts.get(host))
-        {
-            list.addAll(getPendingFiles(context));
-        }
-        return list;
+        streams.remove(context);
     }
 
     // we need sequential and random access to the files. hence, the map and the list.
@@ -117,6 +74,16 @@ public class StreamOutSession
     {
         this.context = context;
     }
+
+    public InetAddress getHost()
+    {
+        return context.host;
+    }
+
+    public long getSessionId()
+    {
+        return context.sessionId;
+    }
     
     public void addFilesToStream(List<PendingFile> pendingFiles)
     {
@@ -142,7 +109,7 @@ public class StreamOutSession
     {
         if (logger.isDebugEnabled())
             logger.debug("Streaming {} ...", pf);
-        MessagingService.instance.stream(new StreamHeader(context.sessionId, pf, true), context.host);
+        MessagingService.instance.stream(new StreamHeader(getSessionId(), pf, true), getHost());
     }
 
     public void finishAndStartNext(String pfname) throws IOException
@@ -150,23 +117,25 @@ public class StreamOutSession
         PendingFile pf = fileMap.remove(pfname);
         files.remove(pf);
 
-        if (files.size() > 0)
-            streamFile(files.get(0));
-        else
+        if (files.isEmpty())
         {
             if (logger.isDebugEnabled())
-                logger.debug("Signalling that streaming is done for {} session {}", context.host,
context.sessionId);
-            remove(context);
+                logger.debug("Signalling that streaming is done for {} session {}", getHost(),
getSessionId());
+            close();
             condition.signalAll();
         }
+        else
+        {
+            streamFile(files.get(0));
+        }
     }
 
     public void removePending(PendingFile pf)
     {
         files.remove(pf);
         fileMap.remove(pf.getFilename());
-        if (files.size() == 0)
-            remove(context);
+        if (files.isEmpty())
+            close();
     }
 
     public void waitForStreamCompletion()
@@ -185,4 +154,25 @@ public class StreamOutSession
     {
         return Collections.unmodifiableList(files);
     }
+
+    public static Set<InetAddress> getDestinations()
+    {
+        Set<InetAddress> hosts = new HashSet<InetAddress>();
+        for (StreamOutSession session : streams.values())
+        {
+            hosts.add(session.getHost());
+        }
+        return hosts;
+    }
+
+    public static List<PendingFile> getOutgoingFiles(InetAddress host)
+    {
+        List<PendingFile> list = new ArrayList<PendingFile>();
+        for (Map.Entry<StreamContext, StreamOutSession> entry : streams.entrySet())
+        {
+            if (entry.getKey().host.equals(host))
+                list.addAll(entry.getValue().getFiles());
+        }
+        return list;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
Tue Sep 14 22:25:53 2010
@@ -54,15 +54,16 @@ public class StreamRequestVerbHandler im
 
             if (srm.file != null)
             {
-                // single file request.
+                // single file re-request.
                 StreamHeader header = new StreamHeader(srm.sessionId, srm.file, false);
                 MessagingService.instance.stream(header, message.getFrom());
-                StreamOutSession.get(new StreamContext(message.getFrom(), srm.sessionId)).removePending(srm.file);
+                StreamOutSession.get(message.getFrom(), srm.sessionId).removePending(srm.file);
             }
             else
             {
                 // range request.
-                StreamOut.transferRangesForRequest(new StreamContext(message.getFrom(), srm.sessionId),
srm.table, srm.ranges, null);
+                StreamOutSession session = StreamOutSession.create(message.getFrom(), srm.sessionId);
+                StreamOut.transferRangesForRequest(session, srm.table, srm.ranges, null);
             }
         }
         catch (IOException ex)

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java Tue
Sep 14 22:25:53 2010
@@ -45,20 +45,20 @@ public class StreamStatusVerbHandler imp
         try
         {
             FileStatus streamStatus = FileStatus.serializer().deserialize(new DataInputStream(bufIn));
-            StreamContext context = new StreamContext(message.getFrom(), streamStatus.getSessionId());
+            StreamOutSession session = StreamOutSession.get(message.getFrom(), streamStatus.getSessionId());
 
             switch (streamStatus.getAction())
             {
                 case DELETE:
-                    StreamOutSession.get(context).finishAndStartNext(streamStatus.getFile());
+                    session.finishAndStartNext(streamStatus.getFile());
                     break;
                 case STREAM:
                     logger.warn("Need to re-stream file {} to {}", streamStatus.getFile(),
message.getFrom());
-                    StreamOutSession.get(context).retry(streamStatus.getFile());
+                    session.retry(streamStatus.getFile());
                     break;
                 case EMPTY:
                     logger.error("Did not find matching ranges on {}", message.getFrom());
-                    StreamInSession.get(context).remove();
+                    StreamInSession.get(message.getFrom(), streamStatus.getSessionId()).remove();
                     if (StorageService.instance.isBootstrapMode())
                         StorageService.instance.removeBootstrapSource(message.getFrom(),
new String(message.getHeader(StreamOut.TABLE_NAME)));
                     break;

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java Tue Sep
14 22:25:53 2010
@@ -54,10 +54,10 @@ public class StreamingService implements
     {
         StringBuilder sb = new StringBuilder();
         sb.append("Receiving from:\n");
-        for (StreamContext source : StreamInSession.getSources())
+        for (InetAddress source : StreamInSession.getSources())
         {
-            sb.append(String.format(" %s:\n", source.host.getHostAddress()));
-            for (PendingFile pf : StreamInSession.getIncomingFiles(source.host))
+            sb.append(String.format(" %s:\n", source.getHostAddress()));
+            for (PendingFile pf : StreamInSession.getIncomingFiles(source))
             {
                 sb.append(String.format("  %s\n", pf.toString()));
             }
@@ -99,13 +99,7 @@ public class StreamingService implements
     /** hosts sending incoming streams */
     public Set<InetAddress> getStreamSources()
     {
-        Set<InetAddress> sources = new HashSet<InetAddress>();
-
-        for(StreamContext context : StreamInSession.getSources())
-        {
-            sources.add(context.host);
-        }
-        return sources;
+        return StreamInSession.getSources();
     }
 
     /** details about incoming streams. */

Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Tue
Sep 14 22:25:53 2010
@@ -34,8 +34,6 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.sstable.SSTableUtils;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.StreamContext;
-import org.apache.cassandra.streaming.StreamOut;
 import org.apache.cassandra.utils.FBUtilities;
 
 import org.junit.BeforeClass;
@@ -68,7 +66,7 @@ public class StreamingTransferTest exten
         List<Range> ranges = new ArrayList<Range>();
         ranges.add(new Range(p.getMinimumToken(), p.getToken("key".getBytes())));
         ranges.add(new Range(p.getToken("key2".getBytes()), p.getMinimumToken()));
-        StreamOut.transferSSTables(new StreamContext(LOCAL), tablename, Arrays.asList(sstable),
ranges);
+        StreamOut.transferSSTables(StreamOutSession.create(LOCAL), tablename, Arrays.asList(sstable),
ranges);
 
         // confirm that the SSTable was transferred and registered
         ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname);
@@ -108,7 +106,7 @@ public class StreamingTransferTest exten
         List<Range> ranges = new ArrayList<Range>();
         ranges.add(new Range(p.getMinimumToken(), p.getToken("transfer1".getBytes())));
         ranges.add(new Range(p.getToken("test2".getBytes()), p.getMinimumToken()));
-        StreamOut.transferSSTables(new StreamContext(LOCAL), tablename, Arrays.asList(sstable,
sstable2), ranges);
+        StreamOut.transferSSTables(StreamOutSession.create(LOCAL), tablename, Arrays.asList(sstable,
sstable2), ranges);
 
         // confirm that the SSTable was transferred and registered
         ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname);



Mime
View raw message