cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r999737 - in /cassandra/trunk/src/java/org/apache/cassandra: service/ streaming/
Date Wed, 22 Sep 2010 03:40:59 GMT
Author: jbellis
Date: Wed Sep 22 03:40:59 2010
New Revision: 999737

URL: http://svn.apache.org/viewvc?rev=999737&view=rev
Log:
clean up FileStatus and rename to StreamReply.  updates comments in StreamOut and StreamIn
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415

Added:
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
      - copied, changed from r999733, cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
      - copied, changed from r999733, cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
Removed:
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=999737&r1=999736&r2=999737&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Sep 22 03:40:59
2010
@@ -28,26 +28,18 @@ import java.util.concurrent.*;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.RawColumnDefinition;
-import org.apache.cassandra.config.RawColumnFamily;
-import org.apache.cassandra.config.RawKeyspace;
-import org.apache.cassandra.utils.SkipNullRepresenter;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.log4j.Level;
 import org.apache.commons.lang.StringUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.migration.AddKeyspace;
 import org.apache.cassandra.db.migration.Migration;
@@ -68,8 +60,8 @@ import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.thrift.Constants;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SkipNullRepresenter;
 import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.log4j.Level;
 import org.yaml.snakeyaml.Dumper;
 import org.yaml.snakeyaml.DumperOptions;
 import org.yaml.snakeyaml.Yaml;
@@ -97,7 +89,7 @@ public class StorageService implements I
         READ_RESPONSE,
         STREAM_INITIATE, // Deprecated
         STREAM_INITIATE_DONE, // Deprecated
-        STREAM_STATUS,
+        STREAM_REPLY,
         STREAM_REQUEST,
         RANGE_SLICE,
         BOOTSTRAP_TOKEN,
@@ -122,7 +114,7 @@ public class StorageService implements I
         put(Verb.READ_REPAIR, Stage.MUTATION);
         put(Verb.READ, Stage.READ);
         put(Verb.READ_RESPONSE, Stage.RESPONSE);
-        put(Verb.STREAM_STATUS, Stage.MISC); // TODO does this really belong on misc? I've
just copied old behavior here
+        put(Verb.STREAM_REPLY, Stage.MISC); // TODO does this really belong on misc? I've
just copied old behavior here
         put(Verb.STREAM_REQUEST, Stage.STREAM);
         put(Verb.RANGE_SLICE, Stage.READ);
         put(Verb.BOOTSTRAP_TOKEN, Stage.MISC);
@@ -222,7 +214,7 @@ public class StorageService implements I
         // see BootStrapper for a summary of how the bootstrap verbs interact
         MessagingService.instance.registerVerbHandlers(Verb.BOOTSTRAP_TOKEN, new BootStrapper.BootstrapTokenVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.STREAM_REQUEST, new StreamRequestVerbHandler()
);
-        MessagingService.instance.registerVerbHandlers(Verb.STREAM_STATUS, new StreamStatusVerbHandler());
+        MessagingService.instance.registerVerbHandlers(Verb.STREAM_REPLY, new StreamReplyVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.READ_RESPONSE, new ResponseVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new TreeRequestVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new AntiEntropyService.TreeResponseVerbHandler());

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=999737&r1=999736&r2=999737&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed
Sep 22 03:40:59 2010
@@ -88,9 +88,10 @@ public class IncomingStreamReader
         }
         catch (IOException ex)
         {
-            logger.debug("Receiving stream: recovering from IO error");
             /* Ask the source node to re-stream this file. */
-            handleFileStatus(FileStatus.Action.RETRY);
+            StreamReply reply = new StreamReply(remoteFile.getFilename(), session.getSessionId(),
StreamReply.Status.FILE_RETRY);
+            logger.info("Streaming of file {} from {} failed: requesting a retry.", remoteFile,
session);
+            MessagingService.instance.sendOneWay(reply.createMessage(), session.getHost());
 
             /* Delete the orphaned file. */
             FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
@@ -103,29 +104,14 @@ public class IncomingStreamReader
 
         if (logger.isDebugEnabled())
             logger.debug("Removing stream context {}", remoteFile);
-        handleFileStatus(FileStatus.Action.FINISHED);
-    }
-
-    private void handleFileStatus(FileStatus.Action action) throws IOException
-    {
-        FileStatus status = new FileStatus(remoteFile.getFilename(), session.getSessionId(),
action);
-
-        if (FileStatus.Action.RETRY == action)
-        {
-            // file needs to be restreamed
-            logger.warn("Streaming of file {} from {} failed: requesting a retry.", remoteFile,
session);
-            MessagingService.instance.sendOneWay(status.makeStreamStatusMessage(), session.getHost());
-            return;
-        }
-
-        assert FileStatus.Action.FINISHED == action : "Unknown stream action: " + action;
 
+        StreamReply reply = new StreamReply(remoteFile.getFilename(), session.getSessionId(),
StreamReply.Status.FILE_FINISHED);
         addSSTable(localFile);
         session.remove(remoteFile);
         // send a StreamStatus message telling the source node it can delete this file
         if (logger.isDebugEnabled())
             logger.debug("Sending a streaming finished message for {} to {}", remoteFile,
session);
-        MessagingService.instance.sendOneWay(status.makeStreamStatusMessage(), session.getHost());
+        MessagingService.instance.sendOneWay(reply.createMessage(), session.getHost());
     }
 
     public static void addSSTable(PendingFile pendingFile)

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=999737&r1=999736&r2=999737&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Wed Sep 22 03:40:59
2010
@@ -37,7 +37,11 @@ import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 
-/** for streaming data from other nodes in to this one */
+/**
+ * for streaming data from other nodes in to this one.
+ * Sends a STREAM_REQUEST Message to the source node(s), after which StreamOut on that side
takes over.
+ * See StreamOut for details.
+ */
 public class StreamIn
 {
     private static Logger logger = LoggerFactory.getLogger(StreamIn.class);

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=999737&r1=999736&r2=999737&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Wed Sep 22 03:40:59
2010
@@ -40,12 +40,19 @@ import org.apache.cassandra.utils.Pair;
 /**
  * This class handles streaming data from one node to another.
  *
- * For StreamingRepair and Unbootstrap
- *  1. The ranges are transferred on a single file basis.
- *  2. Each transfer has the header information for the sstable being transferred.
- *  3. List of the pending files are maintained, as this is the source node.
+ * The source node is in charge of the streaming session.  It begins the stream by sending
+ * a Message with the stream bit flag in the Header turned on.  Part of that Message
+ * will include a StreamHeader that includes the files that will be streamed as part
+ * of that session, as well as the first file-to-be-streamed. (Combining session list
+ * and first file like this is inconvenient, but not as inconvenient as the old
+ * three-part send-file-list, wait-for-ack, start-first-file dance.)
  *
- * For Stream requests (for bootstrap), the main difference is that we always have to
+ * After each file, the target will send a StreamReply indicating success
+ * (FILE_FINISHED) or failure (FILE_RETRY).
+ *
+ * When all files have been successfully transferred the session is complete.
+ *
+ * For Stream requests (for bootstrap), one subtlety is that we always have to
  * create at least one stream reply, even if the list of files is empty, otherwise the
  * target has no way to know that it can stop waiting for an answer.
  *

Copied: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java (from r999733,
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java)
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java?p2=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java&p1=cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java&r1=999733&r2=999737&rev=999737&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java Wed Sep 22 03:40:59
2010
@@ -31,62 +31,52 @@ import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
-class FileStatus
+class StreamReply
 {
-    private static ICompactSerializer<FileStatus> serializer;
-
-    static enum Action
+    static enum Status
     {
         // was received successfully, and can be deleted from the source node
-        FINISHED,
+        FILE_FINISHED,
         // needs to be streamed (or restreamed)
-        RETRY,
-    }
-
-    static
-    {
-        serializer = new FileStatusSerializer();
+        FILE_RETRY,
     }
 
-    public static ICompactSerializer<FileStatus> serializer()
-    {
-        return serializer;
-    }
+    public static final ICompactSerializer<StreamReply> serializer = new FileStatusSerializer();
 
     public final long sessionId;
     public final String file;
-    public final Action action;
+    public final Status action;
 
-    public FileStatus(String file, long sessionId, Action action)
+    public StreamReply(String file, long sessionId, Status action)
     {
         this.file = file;
         this.action = action;
         this.sessionId = sessionId;
     }
 
-    public Message makeStreamStatusMessage() throws IOException
+    public Message createMessage() throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
-        FileStatus.serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_STATUS,
bos.toByteArray());
+        serializer.serialize(this, dos);
+        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REPLY,
bos.toByteArray());
     }
 
-    private static class FileStatusSerializer implements ICompactSerializer<FileStatus>
+    private static class FileStatusSerializer implements ICompactSerializer<StreamReply>
     {
-        public void serialize(FileStatus streamStatus, DataOutputStream dos) throws IOException
+        public void serialize(StreamReply reply, DataOutputStream dos) throws IOException
         {
-            dos.writeLong(streamStatus.sessionId);
-            dos.writeUTF(streamStatus.file);
-            dos.writeInt(streamStatus.action.ordinal());
+            dos.writeLong(reply.sessionId);
+            dos.writeUTF(reply.file);
+            dos.writeInt(reply.action.ordinal());
         }
 
-        public FileStatus deserialize(DataInputStream dis) throws IOException
+        public StreamReply deserialize(DataInputStream dis) throws IOException
         {
             long sessionId = dis.readLong();
             String targetFile = dis.readUTF();
-            Action action = Action.values()[dis.readInt()];
-            return new FileStatus(targetFile, sessionId, action);
+            Status action = Status.values()[dis.readInt()];
+            return new StreamReply(targetFile, sessionId, action);
         }
     }
 }

Copied: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
(from r999733, cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java)
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java?p2=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java&p1=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java&r1=999733&r2=999737&rev=999737&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java Wed
Sep 22 03:40:59 2010
@@ -28,14 +28,13 @@ import java.io.IOException;
 
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StreamStatusVerbHandler implements IVerbHandler
+public class StreamReplyVerbHandler implements IVerbHandler
 {
-    private static Logger logger = LoggerFactory.getLogger(StreamStatusVerbHandler.class);
+    private static Logger logger = LoggerFactory.getLogger(StreamReplyVerbHandler.class);
 
     public void doVerb(Message message)
     {
@@ -44,21 +43,21 @@ public class StreamStatusVerbHandler imp
 
         try
         {
-            FileStatus streamStatus = FileStatus.serializer().deserialize(new DataInputStream(bufIn));
-            StreamOutSession session = StreamOutSession.get(message.getFrom(), streamStatus.sessionId);
-            session.validateCurrentFile(streamStatus.file);
+            StreamReply reply = StreamReply.serializer.deserialize(new DataInputStream(bufIn));
+            StreamOutSession session = StreamOutSession.get(message.getFrom(), reply.sessionId);
+            session.validateCurrentFile(reply.file);
 
-            switch (streamStatus.action)
+            switch (reply.action)
             {
-                case FINISHED:
+                case FILE_FINISHED:
                     session.startNext();
                     break;
-                case RETRY:
-                    logger.warn("Need to re-stream file {} to {}", streamStatus.file, message.getFrom());
+                case FILE_RETRY:
+                    logger.warn("Need to re-stream file {} to {}", reply.file, message.getFrom());
                     session.retry();
                     break;
                 default:
-                    throw new RuntimeException("Cannot handle FileStatus.Action: " + streamStatus.action);
+                    throw new RuntimeException("Cannot handle FileStatus.Action: " + reply.action);
             }
         }
         catch (IOException ex)



Mime
View raw message