cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r997527 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/streaming/
Date Wed, 15 Sep 2010 22:22:10 GMT
Author: jbellis
Date: Wed Sep 15 22:22:09 2010
New Revision: 997527

URL: http://svn.apache.org/viewvc?rev=997527&view=rev
Log:
combine "initiated" and requested streaming paths
patch by jbellis; reviewed by Nick Bailey for CASSANDRA-1506

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=997527&r1=997526&r2=997527&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Sep 15 22:22:09 2010
@@ -75,6 +75,7 @@
  * by default, calculate heap size and memtable thresholds at runtime (CASSANDRA-1469)
  * fix races dealing with adding/dropping keyspaces and column families in
    rapid succession (CASSANDRA-1477)
+ * clean up of Streaming system (CASSANDRA-1503, 1504, 1506)
 
 
 0.7-beta1

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=997527&r1=997526&r2=997527&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Wed Sep 15 22:22:09
2010
@@ -73,22 +73,23 @@ public class FileStreamTask extends Wrap
             }
         }
         if (logger.isDebugEnabled())
-          logger.debug("Done streaming " + header.getStreamFile());
+            logger.debug("Done streaming " + header.file);
     }
 
     private void stream(SocketChannel channel) throws IOException
     {
-        RandomAccessFile raf = new RandomAccessFile(new File(header.getStreamFile().getFilename()),
"r");
+        ByteBuffer buffer = MessagingService.constructStreamHeader(header, false);
+        channel.write(buffer);
+        assert buffer.remaining() == 0;
+        if (header.file == null)
+            return;
+
+        RandomAccessFile raf = new RandomAccessFile(new File(header.file.getFilename()),
"r");
         try
         {
             FileChannel fc = raf.getChannel();
-
-            ByteBuffer buffer = MessagingService.constructStreamHeader(header, false);
-            channel.write(buffer);
-            assert buffer.remaining() == 0;
-            
             // stream sections of the file as returned by PendingFile.currentSection
-            for (Pair<Long, Long> section : header.getStreamFile().sections)
+            for (Pair<Long, Long> section : header.file.sections)
             {
                 long length = section.right - section.left;
                 long bytesTransferred = 0;
@@ -136,7 +137,7 @@ public class FileStreamTask extends Wrap
                     throw e;
 
                 long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts);
-                logger.warn("Failed attempt " + attempts + " to connect to " + to + " to
stream " + header.getStreamFile() + ". Retrying in " + waitms + " ms. (" + e + ")");
+                logger.warn("Failed attempt " + attempts + " to connect to " + to + " to
stream " + header.file + ". Retrying in " + waitms + " ms. (" + e + ")");
                 try
                 {
                     Thread.sleep(waitms);

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=997527&r1=997526&r2=997527&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Wed Sep
15 22:22:09 2010
@@ -537,8 +537,8 @@ public class AntiEntropyService
                 {
                     protected void runMayThrow() throws Exception
                     {
-                        StreamOutSession session = StreamOutSession.create(request.endpoint);
-                        StreamOut.transferSSTables(session, request.cf.left, sstables, ranges);
+                        StreamOutSession session = StreamOutSession.create(request.cf.left,
request.endpoint, null);
+                        StreamOut.transferSSTables(session, sstables, ranges);
                         session.close();
                     }
                 });

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java?rev=997527&r1=997526&r2=997527&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java Wed Sep 15 22:22:09
2010
@@ -26,7 +26,6 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
-import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
@@ -39,11 +38,9 @@ class FileStatus
     static enum Action
     {
         // was received successfully, and can be deleted from the source node
-        DELETE,
+        FINISHED,
         // needs to be streamed (or restreamed)
-        STREAM,
-        // No matching Ranges, this should not happen in almost all cases
-        EMPTY
+        RETRY,
     }
 
     static
@@ -56,38 +53,15 @@ class FileStatus
         return serializer;
     }
 
-    private final long sessionId;
-    private final String file;
-    private Action action;
-
-    /**
-     * Create a FileStatus with the default Action: STREAM.
-     */
-    public FileStatus(String file, long sessionId)
-    {
-        this.file = file;
-        this.action = Action.STREAM;
-        this.sessionId = sessionId;
-    }
+    public final long sessionId;
+    public final String file;
+    public final Action action;
 
-    public String getFile()
-    {
-        return file;
-    }
-
-    public void setAction(Action action)
+    public FileStatus(String file, long sessionId, Action action)
     {
+        this.file = file;
         this.action = action;
-    }
-
-    public Action getAction()
-    {
-        return action;
-    }
-
-    public long getSessionId()
-    {
-        return sessionId;
+        this.sessionId = sessionId;
     }
 
     public Message makeStreamStatusMessage() throws IOException
@@ -102,28 +76,17 @@ class FileStatus
     {
         public void serialize(FileStatus streamStatus, DataOutputStream dos) throws IOException
         {
-            dos.writeLong(streamStatus.getSessionId());
-            dos.writeUTF(streamStatus.getFile());
-            dos.writeInt(streamStatus.getAction().ordinal());
+            dos.writeLong(streamStatus.sessionId);
+            dos.writeUTF(streamStatus.file);
+            dos.writeInt(streamStatus.action.ordinal());
         }
 
         public FileStatus deserialize(DataInputStream dis) throws IOException
         {
             long sessionId = dis.readLong();
             String targetFile = dis.readUTF();
-            FileStatus streamStatus = new FileStatus(targetFile, sessionId);
-
-            int ordinal = dis.readInt();
-            if (ordinal == Action.DELETE.ordinal())
-                streamStatus.setAction(Action.DELETE);
-            else if (ordinal == Action.STREAM.ordinal())
-                streamStatus.setAction(Action.STREAM);
-            else if (ordinal == Action.EMPTY.ordinal())
-                streamStatus.setAction(Action.EMPTY);
-            else
-                throw new IOException("Bad FileStatus.Action: " + ordinal);
-
-            return streamStatus;
+            Action action = Action.values()[dis.readInt()];
+            return new FileStatus(targetFile, sessionId, action);
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=997527&r1=997526&r2=997527&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed
Sep 15 22:22:09 2010
@@ -38,48 +38,46 @@ public class IncomingStreamReader
 {
     private static final Logger logger = LoggerFactory.getLogger(IncomingStreamReader.class);
 
-    private PendingFile pendingFile;
-    private PendingFile lastFile;
-    private FileStatus streamStatus;
+    private final PendingFile localFile;
+    private final PendingFile remoteFile;
     private final SocketChannel socketChannel;
-    // indicates an transfer initiated by the source, as opposed to one requested by the
recipient
-    private final boolean initiatedTransfer;
     private final StreamInSession session;
 
     public IncomingStreamReader(StreamHeader header, SocketChannel socketChannel) throws
IOException
     {
         this.socketChannel = socketChannel;
         InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+
+        session = StreamInSession.get(remoteAddress.getAddress(), header.sessionId);
+        session.addFiles(header.pendingFiles);
+        session.setTable(header.table);
         // pendingFile gets the new context for the local node.
-        pendingFile = StreamIn.getContextMapping(header.getStreamFile());
-        // lastFile has the old context, which was registered in the manager.
-        lastFile = header.getStreamFile();
-        initiatedTransfer = header.initiatedTransfer;
-        assert pendingFile != null;
-        session = StreamInSession.get(remoteAddress.getAddress(), header.getSessionId());
-        session.addActiveStream(pendingFile);
-        // For transfers setup the status and for replies to requests, prepare the list
-        // of available files to request.
-        if (initiatedTransfer)
-            streamStatus = new FileStatus(lastFile.getFilename(), header.getSessionId());
-        else if (header.getPendingFiles() != null)
-            session.addFilesToRequest(header.getPendingFiles());
+        remoteFile = header.file;
+        localFile = remoteFile != null ? StreamIn.getContextMapping(remoteFile) : null;
     }
 
     public void read() throws IOException
     {
+        if (remoteFile != null)
+            readFile();
+
+        session.closeIfFinished();
+    }
+
+    private void readFile() throws IOException
+    {
         if (logger.isDebugEnabled())
         {
             logger.debug("Receiving stream");
-            logger.debug("Creating file for {}", pendingFile.getFilename());
+            logger.debug("Creating file for {}", localFile.getFilename());
         }
-        FileOutputStream fos = new FileOutputStream(pendingFile.getFilename(), true);
+        FileOutputStream fos = new FileOutputStream(localFile.getFilename(), true);
         FileChannel fc = fos.getChannel();
 
         long offset = 0;
         try
         {
-            for (Pair<Long, Long> section : pendingFile.sections)
+            for (Pair<Long, Long> section : localFile.sections)
             {
                 long length = section.right - section.left;
                 long bytesRead = 0;
@@ -92,51 +90,42 @@ public class IncomingStreamReader
         {
             logger.debug("Receiving stream: recovering from IO error");
             /* Ask the source node to re-stream this file. */
-            if (initiatedTransfer)
-                handleFileStatus(FileStatus.Action.STREAM);
-            else
-                session.requestFile(lastFile);
+            handleFileStatus(FileStatus.Action.RETRY);
 
             /* Delete the orphaned file. */
-            FileUtils.deleteWithConfirm(new File(pendingFile.getFilename()));
+            FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
             throw ex;
         }
         finally
         {
             fc.close();
-            session.removeActiveStream(pendingFile);
         }
 
         if (logger.isDebugEnabled())
-            logger.debug("Removing stream context {}", pendingFile);
-        if (initiatedTransfer)
-            handleFileStatus(FileStatus.Action.DELETE);
-        else
-        {
-            addSSTable(pendingFile);
-            session.finishAndRequestNext(lastFile);
-        }
+            logger.debug("Removing stream context {}", remoteFile);
+        handleFileStatus(FileStatus.Action.FINISHED);
     }
 
     private void handleFileStatus(FileStatus.Action action) throws IOException
     {
-        streamStatus.setAction(action);
-        
-        if (FileStatus.Action.STREAM == streamStatus.getAction())
+        FileStatus status = new FileStatus(remoteFile.getFilename(), session.getSessionId(),
action);
+
+        if (FileStatus.Action.RETRY == action)
         {
             // file needs to be restreamed
-            logger.warn("Streaming of file {} from {} failed: requesting a retry.", pendingFile,
session);
-            MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(),
session.getHost());
+            logger.warn("Streaming of file {} from {} failed: requesting a retry.", remoteFile,
session);
+            MessagingService.instance.sendOneWay(status.makeStreamStatusMessage(), session.getHost());
             return;
         }
-        assert FileStatus.Action.DELETE == streamStatus.getAction() : "Unknown stream action:
" + streamStatus.getAction();
 
-        addSSTable(pendingFile);
+        assert FileStatus.Action.FINISHED == action : "Unknown stream action: " + action;
 
+        addSSTable(localFile);
+        session.remove(remoteFile);
         // send a StreamStatus message telling the source node it can delete this file
         if (logger.isDebugEnabled())
-            logger.debug("Sending a streaming finished message for {} to {}", pendingFile,
session);
-        MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), session.getHost());
+            logger.debug("Sending a streaming finished message for {} to {}", remoteFile,
session);
+        MessagingService.instance.sendOneWay(status.makeStreamStatusMessage(), session.getHost());
     }
 
     public static void addSSTable(PendingFile pendingFile)

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=997527&r1=997526&r2=997527&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java Wed Sep 15 22:22:09
2010
@@ -36,14 +36,9 @@ import org.apache.cassandra.utils.Pair;
  */
 public class PendingFile
 {
-    private static ICompactSerializer<PendingFile> serializer_;
+    private static PendingFileSerializer serializer_ = new PendingFileSerializer();
 
-    static
-    {
-        serializer_ = new PendingFileSerializer();
-    }
-
-    public static ICompactSerializer<PendingFile> serializer()
+    public static PendingFileSerializer serializer()
     {
         return serializer_;
     }
@@ -88,10 +83,16 @@ public class PendingFile
         return getFilename() + "/" + sections;
     }
 
-    private static class PendingFileSerializer implements ICompactSerializer<PendingFile>
+    public static class PendingFileSerializer implements ICompactSerializer<PendingFile>
     {
         public void serialize(PendingFile sc, DataOutputStream dos) throws IOException
         {
+            if (sc == null)
+            {
+                dos.writeUTF("");
+                return;
+            }
+
             dos.writeUTF(sc.desc.filenameFor(sc.component));
             dos.writeUTF(sc.component);
             dos.writeInt(sc.sections.size());
@@ -103,7 +104,11 @@ public class PendingFile
 
         public PendingFile deserialize(DataInputStream dis) throws IOException
         {
-            Descriptor desc = Descriptor.fromFilename(dis.readUTF());
+            String filename = dis.readUTF();
+            if (filename.isEmpty())
+                return null;
+            
+            Descriptor desc = Descriptor.fromFilename(filename);
             String component = dis.readUTF();
             int count = dis.readInt();
             List<Pair<Long,Long>> sections = new ArrayList<Pair<Long,Long>>(count);

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java?rev=997527&r1=997526&r2=997527&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java Wed Sep 15 22:22:09
2010
@@ -25,6 +25,8 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.cassandra.io.ICompactSerializer;
@@ -43,86 +45,58 @@ public class StreamHeader
         return serializer;
     }
 
-    private PendingFile file;
-    private long sessionId;
-    
-    // indicates an transfer initiated by the source, as opposed to one requested by the
recipient
-    protected final boolean initiatedTransfer;
-    
-    // this list will only be non-null when the first of a batch of files are being sent.
it avoids having to have
-    // a separate message indicating which files to expect.
-    private final List<PendingFile> pending;
+    public final String table;
 
-    public StreamHeader(long sessionId, PendingFile file, boolean initiatedTransfer)
-    {
-        this(sessionId, file, null, initiatedTransfer);
-    }
+    /** file being sent on initial stream */
+    public final PendingFile file;
 
-    public StreamHeader(long sessionId, PendingFile file, List<PendingFile> pending,
boolean initiatedTransfer)
-    {
-        this.sessionId  = sessionId;
-        this.file = file;
-        this.initiatedTransfer = initiatedTransfer;
-        this.pending = pending;
-    }
+    /** session is tuple of (host, sessionid) */
+    public final long sessionId;
 
-    public List<PendingFile> getPendingFiles()
-    {
-        return pending;
-    }
+    /** files to add to the session */
+    public final Collection<PendingFile> pendingFiles;
 
-    public PendingFile getStreamFile()
+    public StreamHeader(String table, long sessionId, PendingFile file)
     {
-        return file;
+        this(table, sessionId, file, Collections.<PendingFile>emptyList());
     }
 
-    public long getSessionId()
+    public StreamHeader(String table, long sessionId, PendingFile first, Collection<PendingFile>
pendingFiles)
     {
-        return sessionId;
+        this.table = table;
+        this.sessionId  = sessionId;
+        this.file = first;
+        this.pendingFiles = pendingFiles;
     }
 
     private static class StreamHeaderSerializer implements ICompactSerializer<StreamHeader>
     {
         public void serialize(StreamHeader sh, DataOutputStream dos) throws IOException
         {
-            dos.writeLong(sh.getSessionId());
-            PendingFile.serializer().serialize(sh.getStreamFile(), dos);
-            dos.writeBoolean(sh.initiatedTransfer);
-            if (sh.pending != null)
+            dos.writeUTF(sh.table);
+            dos.writeLong(sh.sessionId);
+            PendingFile.serializer().serialize(sh.file, dos);
+            dos.writeInt(sh.pendingFiles.size());
+            for(PendingFile file : sh.pendingFiles)
             {
-                dos.writeInt(sh.getPendingFiles().size());
-                for(PendingFile file : sh.getPendingFiles())
-                {
-                    PendingFile.serializer().serialize(file, dos);
-                }
+                PendingFile.serializer().serialize(file, dos);
             }
-            else
-                dos.writeInt(0);
         }
 
         public StreamHeader deserialize(DataInputStream dis) throws IOException
         {
-           long sessionId = dis.readLong();
-           PendingFile file = PendingFile.serializer().deserialize(dis);
-           boolean initiatedTransfer = dis.readBoolean();
-           int size = dis.readInt();
-           StreamHeader header;
-
-           if (size > 0)
-           {
-               List<PendingFile> pendingFiles = new ArrayList<PendingFile>(size);
-               for (int i=0; i<size; i++)
-               {
-                   pendingFiles.add(PendingFile.serializer().deserialize(dis));
-               }
-               header = new StreamHeader(sessionId, file, pendingFiles, initiatedTransfer);
-           }
-           else
-           {
-               header = new StreamHeader(sessionId, file, initiatedTransfer);
-           }
+            String table = dis.readUTF();
+            long sessionId = dis.readLong();
+            PendingFile file = PendingFile.serializer().deserialize(dis);
+            int size = dis.readInt();
+
+            List<PendingFile> pendingFiles = new ArrayList<PendingFile>(size);
+            for (int i = 0; i < size; i++)
+            {
+                pendingFiles.add(PendingFile.serializer().deserialize(dis));
+            }
 
-           return header;
+            return new StreamHeader(table, sessionId, file, pendingFiles);
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=997527&r1=997526&r2=997527&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Wed Sep 15
22:22:09 2010
@@ -22,8 +22,6 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -38,11 +36,11 @@ public class StreamInSession
     private static final Logger logger = LoggerFactory.getLogger(StreamInSession.class);
 
     private static ConcurrentMap<Pair<InetAddress, Long>, StreamInSession> sessions
= new NonBlockingHashMap<Pair<InetAddress, Long>, StreamInSession>();
-    private final Set<PendingFile> activeStreams = new HashSet<PendingFile>();
 
-    private final List<PendingFile> pendingFiles = new ArrayList<PendingFile>();
+    private final List<PendingFile> files = new ArrayList<PendingFile>();
     private final Pair<InetAddress, Long> context;
     private final Runnable callback;
+    private String table;
 
     private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
     {
@@ -50,11 +48,6 @@ public class StreamInSession
         this.callback = callback;
     }
 
-    public static StreamInSession create(InetAddress host)
-    {
-        return create(host, null);
-    }
-
     public static StreamInSession create(InetAddress host, Runnable callback)
     {
         Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, System.nanoTime());
@@ -79,55 +72,34 @@ public class StreamInSession
         return session;
     }
 
-    // FIXME hack for "initiated" streams.  replace w/ integration w/ pendingfiles
-    public void addActiveStream(PendingFile file)
-    {
-        activeStreams.add(file);
-    }
-
-    public void removeActiveStream(PendingFile file)
+    public void setTable(String table)
     {
-        activeStreams.remove(file);
+        this.table = table;
     }
 
-    public void addFilesToRequest(List<PendingFile> files)
+    public void addFiles(Collection<PendingFile> files)
     {
         for(PendingFile file : files)
         {
             if(logger.isDebugEnabled())
                 logger.debug("Adding file {} to Stream Request queue", file.getFilename());
-            this.pendingFiles.add(file);
+            this.files.add(file);
         }
     }
 
-    /**
-     * Complete the transfer process of the existing file and then request
-     * the next file in the list
-     */
-    public void finishAndRequestNext(PendingFile lastFile)
-    {
-        pendingFiles.remove(lastFile);
-        if (pendingFiles.size() > 0)
-            requestFile(pendingFiles.get(0));
-        else
-        {
-            close();
-        }
-    }
-
-    public void close()
+    public void remove(PendingFile file)
     {
-        sessions.remove(context);
-        if (callback != null)
-            callback.run();
+        files.remove(file);
     }
 
-    public void requestFile(PendingFile file)
+    public void closeIfFinished()
     {
-        if (logger.isDebugEnabled())
-            logger.debug("Requesting file {} from source {}", file.getFilename(), getHost());
-        Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(), file, getSessionId()).makeMessage();
-        MessagingService.instance.sendOneWay(message, getHost());
+        if (files.isEmpty())
+        {
+            if (callback != null)
+                callback.run();
+            sessions.remove(context);
+        }
     }
 
     public long getSessionId()
@@ -160,8 +132,7 @@ public class StreamInSession
             if (entry.getKey().left.equals(host))
             {
                 StreamInSession session = entry.getValue();
-                list.addAll(session.pendingFiles);
-                list.addAll(session.activeStreams);
+                list.addAll(session.files);
             }
         }
         return list;

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=997527&r1=997526&r2=997527&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Wed Sep 15 22:22:09
2010
@@ -35,8 +35,6 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -47,18 +45,15 @@ import org.apache.cassandra.utils.Pair;
  *  2. Each transfer has the header information for the sstable being transferred.
  *  3. List of the pending files are maintained, as this is the source node.
  *
- * For StreamRequests
- *  1. The ranges are compiled and the first file transferred.
- *  2. The header contains the first file info + all the remaining pending files info.
- *  3. List of the pending files are not maintained, that will be maintained by the destination
node
+ * For Stream requests (for bootstrap), the main difference is that we always have to
+ * create at least one stream reply, even if the list of files is empty, otherwise the
+ * target has no way to know that it can stop waiting for an answer.
  *
  */
 public class StreamOut
 {
     private static Logger logger = LoggerFactory.getLogger(StreamOut.class);
 
-    static String TABLE_NAME = "STREAMING-TABLE-NAME";
-
     /**
      * Split out files for all tables on disk locally for each range and then stream them
to the target endpoint.
     */
@@ -67,7 +62,7 @@ public class StreamOut
         assert ranges.size() > 0;
         
         // this is so that this target shows up as a destination while anticompaction is
happening.
-        StreamOutSession session = StreamOutSession.create(target, callback);
+        StreamOutSession session = StreamOutSession.create(tableName, target, callback);
 
         logger.info("Beginning transfer process to {} for ranges {}", target, StringUtils.join(ranges,
", "));
 
@@ -75,7 +70,7 @@ public class StreamOut
         {
             Table table = flushSSTable(tableName);
             // send the matching portion of every sstable in the keyspace
-            transferSSTables(session, tableName, table.getAllSSTables(), ranges);
+            transferSSTables(session, table.getAllSSTables(), ranges);
         }
         catch (IOException e)
         {
@@ -117,7 +112,7 @@ public class StreamOut
     /**
      * Split out files for all tables on disk locally for each range and then stream them
to the target endpoint.
     */
-    public static void transferRangesForRequest(StreamOutSession session, String tableName,
Collection<Range> ranges)
+    public static void transferRangesForRequest(StreamOutSession session, Collection<Range>
ranges)
     {
         assert ranges.size() > 0;
 
@@ -125,9 +120,11 @@ public class StreamOut
 
         try
         {
-            Table table = flushSSTable(tableName);
+            Table table = flushSSTable(session.table);
             // send the matching portion of every sstable in the keyspace
-            transferSSTablesForRequest(session, tableName, table.getAllSSTables(), ranges);
+            List<PendingFile> pending = createPendingFiles(table.getAllSSTables(),
ranges);
+            session.addFilesToStream(pending);
+            session.begin();
         }
         catch (IOException e)
         {
@@ -138,17 +135,14 @@ public class StreamOut
     /**
      * Transfers matching portions of a group of sstables from a single table to the target
endpoint.
      */
-    public static void transferSSTables(StreamOutSession session, String table, Collection<SSTableReader>
sstables, Collection<Range> ranges) throws IOException
+    public static void transferSSTables(StreamOutSession session, Collection<SSTableReader>
sstables, Collection<Range> ranges) throws IOException
     {
         List<PendingFile> pending = createPendingFiles(sstables, ranges);
 
         if (pending.size() > 0)
         {
-            StreamHeader header = new StreamHeader(session.getSessionId(), pending.get(0),
true);
             session.addFilesToStream(pending);
-
-            logger.info("Streaming file {} to {}", header.getStreamFile(), session.getHost());
-            MessagingService.instance.stream(header, session.getHost());
+            session.begin();
 
             logger.info("Waiting for transfer to {} to complete", session.getHost());
             session.waitForStreamCompletion();
@@ -156,34 +150,6 @@ public class StreamOut
         }
     }
 
-    /**
-     * Transfers the first file for matching portions of a group of sstables and appends
a list of other files
-     * to the header for the requesting destination to take control of the rest of the transfers
-     */
-    private static void transferSSTablesForRequest(StreamOutSession session, String table,
Collection<SSTableReader> sstables, Collection<Range> ranges) throws IOException
-    {
-        List<PendingFile> pending = createPendingFiles(sstables, ranges);
-        if (pending.size() > 0)
-        {
-            StreamHeader header = new StreamHeader(session.getSessionId(), pending.get(0),
pending, false);
-            // In case this happens to be a re-request due to some error condition on the
destination side
-            if (session.getFiles().isEmpty())
-                session.addFilesToStream(pending);
-
-            logger.info("Streaming file {} to {}", header.getStreamFile(), session.getHost());
-            MessagingService.instance.stream(header, session.getHost());
-            session.removePending(header.getStreamFile());
-        }
-        else
-        {
-            FileStatus status = new FileStatus("", session.getSessionId());
-            status.setAction(FileStatus.Action.EMPTY);
-            Message message = status.makeStreamStatusMessage();
-            message.setHeader(StreamOut.TABLE_NAME, table.getBytes());
-            MessagingService.instance.sendOneWay(message, session.getHost());
-        }
-    }
-
     // called prior to sending anything.
     private static List<PendingFile> createPendingFiles(Collection<SSTableReader>
sstables, Collection<Range> ranges)
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=997527&r1=997526&r2=997527&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java Wed Sep
15 22:22:09 2010
@@ -23,6 +23,7 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,30 +42,20 @@ public class StreamOutSession
     // one host may have multiple stream sessions.
     private static final ConcurrentMap<Pair<InetAddress, Long>, StreamOutSession>
streams = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamOutSession>();
 
-    private final Map<String, PendingFile> files = new LinkedHashMap<String, PendingFile>();
-    private final Pair<InetAddress, Long> context;
-    private final Runnable callback;
-    private final SimpleCondition condition = new SimpleCondition();
-
-    public static StreamOutSession create(InetAddress host)
-    {
-        return create(host, System.nanoTime(), null);
-    }
-
-    public static StreamOutSession create(InetAddress host, Runnable callback)
+    public static StreamOutSession create(String table, InetAddress host, Runnable callback)
     {
-        return create(host, System.nanoTime(), callback);
+        return create(table, host, System.nanoTime(), callback);
     }
 
-    public static StreamOutSession create(InetAddress host, long sessionId)
+    public static StreamOutSession create(String table, InetAddress host, long sessionId)
     {
-        return create(host, sessionId, null);
+        return create(table, host, sessionId, null);
     }
 
-    public static StreamOutSession create(InetAddress host, long sessionId, Runnable callback)
+    public static StreamOutSession create(String table, InetAddress host, long sessionId,
Runnable callback)
     {
         Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, sessionId);
-        StreamOutSession session = new StreamOutSession(context, callback);
+        StreamOutSession session = new StreamOutSession(table, context, callback);
         streams.put(context, session);
         return session;
     }
@@ -74,17 +65,19 @@ public class StreamOutSession
         return streams.get(new Pair<InetAddress, Long>(host, sessionId));
     }
 
-    public void close()
-    {
-        streams.remove(context);
-        if(callback != null) 
-            callback.run();
-    }
+    private final Map<String, PendingFile> files = new LinkedHashMap<String, PendingFile>();
+
+    public final String table;
+    private final Pair<InetAddress, Long> context;
+    private final SimpleCondition condition = new SimpleCondition();
+    private final Runnable callback;
+    private String currentFile;
 
-    private StreamOutSession(Pair<InetAddress, Long> context, Runnable cb)
+    private StreamOutSession(String table, Pair<InetAddress, Long> context, Runnable
callback)
     {
+        this.table = table;
         this.context = context;
-        this.callback = cb;
+        this.callback = callback;
     }
 
     public InetAddress getHost()
@@ -109,23 +102,22 @@ public class StreamOutSession
         }
     }
     
-    public void retry(String file)
+    public void retry()
     {
-        PendingFile pf = files.get(file);
-        if (pf != null)
-            streamFile(pf);
+        streamFile(files.get(currentFile));
     }
 
     private void streamFile(PendingFile pf)
     {
         if (logger.isDebugEnabled())
             logger.debug("Streaming {} ...", pf);
-        MessagingService.instance.stream(new StreamHeader(getSessionId(), pf, true), getHost());
+        currentFile = pf.getFilename();
+        MessagingService.instance.stream(new StreamHeader(table, getSessionId(), pf), getHost());
     }
 
-    public void finishAndStartNext(String pfname) throws IOException
+    public void startNext() throws IOException
     {
-        files.remove(pfname);
+        files.remove(currentFile);
         
         if (files.isEmpty())
         {
@@ -140,6 +132,13 @@ public class StreamOutSession
         }
     }
 
+    public void close()
+    {
+        streams.remove(context);
+        if (callback != null)
+            callback.run();
+    }
+
     public void removePending(PendingFile pf)
     {
         files.remove(pf.getFilename());
@@ -184,4 +183,19 @@ public class StreamOutSession
         }
         return list;
     }
+
+    public void validateCurrentFile(String file)
+    {
+        if (!file.equals(currentFile))
+            throw new IllegalStateException(String.format("target reports current file is
%s but is %s", file, currentFile));
+    }
+
+    public void begin()
+    {
+        PendingFile first = files.isEmpty() ? null : files.values().iterator().next();
+        currentFile = first == null ? null : first.getFilename();
+        StreamHeader header = new StreamHeader(table, getSessionId(), first, files.values());
+        logger.info("Streaming files {} to {}", StringUtils.join(files.values(), ","), getHost());
+        MessagingService.instance.stream(header, getHost());
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=997527&r1=997526&r2=997527&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
Wed Sep 15 22:22:09 2010
@@ -48,23 +48,11 @@ public class StreamRequestVerbHandler im
         try
         {
             StreamRequestMessage srm = StreamRequestMessage.serializer().deserialize(new
DataInputStream(bufIn));
-
             if (logger.isDebugEnabled())
                 logger.debug(srm.toString());
 
-            if (srm.file != null)
-            {
-                // single file re-request.
-                StreamHeader header = new StreamHeader(srm.sessionId, srm.file, false);
-                MessagingService.instance.stream(header, message.getFrom());
-                StreamOutSession.get(message.getFrom(), srm.sessionId).removePending(srm.file);
-            }
-            else
-            {
-                // range request.
-                StreamOutSession session = StreamOutSession.create(message.getFrom(), srm.sessionId);
-                StreamOut.transferRangesForRequest(session, srm.table, srm.ranges);
-            }
+            StreamOutSession session = StreamOutSession.create(srm.table, message.getFrom(),
srm.sessionId);
+            StreamOut.transferRangesForRequest(session, srm.ranges);
         }
         catch (IOException ex)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java?rev=997527&r1=997526&r2=997527&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java Wed
Sep 15 22:22:09 2010
@@ -45,23 +45,20 @@ public class StreamStatusVerbHandler imp
         try
         {
             FileStatus streamStatus = FileStatus.serializer().deserialize(new DataInputStream(bufIn));
-            StreamOutSession session = StreamOutSession.get(message.getFrom(), streamStatus.getSessionId());
+            StreamOutSession session = StreamOutSession.get(message.getFrom(), streamStatus.sessionId);
+            session.validateCurrentFile(streamStatus.file);
 
-            switch (streamStatus.getAction())
+            switch (streamStatus.action)
             {
-                case DELETE:
-                    session.finishAndStartNext(streamStatus.getFile());
+                case FINISHED:
+                    session.startNext();
                     break;
-                case STREAM:
-                    logger.warn("Need to re-stream file {} to {}", streamStatus.getFile(),
message.getFrom());
-                    session.retry(streamStatus.getFile());
-                    break;
-                case EMPTY:
-                    logger.error("Did not find matching ranges on {}", message.getFrom());
-                    StreamInSession.get(message.getFrom(), streamStatus.getSessionId()).close();
+                case RETRY:
+                    logger.warn("Need to re-stream file {} to {}", streamStatus.file, message.getFrom());
+                    session.retry();
                     break;
                 default:
-                    throw new RuntimeException("Cannot handle FileStatus.Action: " + streamStatus.getAction());
+                    throw new RuntimeException("Cannot handle FileStatus.Action: " + streamStatus.action);
             }
         }
         catch (IOException ex)

Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=997527&r1=997526&r2=997527&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Wed
Sep 15 22:22:09 2010
@@ -66,7 +66,7 @@ public class StreamingTransferTest exten
         List<Range> ranges = new ArrayList<Range>();
         ranges.add(new Range(p.getMinimumToken(), p.getToken("key".getBytes())));
         ranges.add(new Range(p.getToken("key2".getBytes()), p.getMinimumToken()));
-        StreamOut.transferSSTables(StreamOutSession.create(LOCAL), tablename, Arrays.asList(sstable),
ranges);
+        StreamOut.transferSSTables(StreamOutSession.create(tablename, LOCAL, null), Arrays.asList(sstable),
ranges);
 
         // confirm that the SSTable was transferred and registered
         ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname);
@@ -106,7 +106,7 @@ public class StreamingTransferTest exten
         List<Range> ranges = new ArrayList<Range>();
         ranges.add(new Range(p.getMinimumToken(), p.getToken("transfer1".getBytes())));
         ranges.add(new Range(p.getToken("test2".getBytes()), p.getMinimumToken()));
-        StreamOut.transferSSTables(StreamOutSession.create(LOCAL), tablename, Arrays.asList(sstable,
sstable2), ranges);
+        StreamOut.transferSSTables(StreamOutSession.create(tablename, LOCAL, null), Arrays.asList(sstable,
sstable2), ranges);
 
         // confirm that the SSTable was transferred and registered
         ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname);



Mime
View raw message