cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r904929 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming: StreamCompletionHandler.java StreamFinishedVerbHandler.java StreamInManager.java
Date Sun, 31 Jan 2010 00:23:42 GMT
Author: jbellis
Date: Sun Jan 31 00:23:42 2010
New Revision: 904929

URL: http://svn.apache.org/viewvc?rev=904929&view=rev
Log:
r/m unneeded StreamStatusMessage wrapper
patch by jbellis; reviewed by stuhood for CASSANDRA-751

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java?rev=904929&r1=904928&r2=904929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
Sun Jan 31 00:23:42 2010
@@ -51,10 +51,8 @@
 
         if (logger.isDebugEnabled())
           logger.debug("Sending a streaming finished message with " + streamStatus + " to
" + host);
-        /* Send a StreamStatusMessage object which may require the source node to re-stream
certain files. */
-        StreamInManager.StreamStatusMessage streamStatusMessage = new StreamInManager.StreamStatusMessage(streamStatus);
-        Message message = StreamInManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
-        MessagingService.instance.sendOneWay(message, host);
+        /* Send a StreamStatus message which may require the source node to re-stream certain
files. */
+        MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), host);
 
         /* If we're done with everything for this host, remove from bootstrap sources */
         if (StreamInManager.isDone(host) && StorageService.instance.isBootstrapMode())

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=904929&r1=904928&r2=904929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
Sun Jan 31 00:23:42 2010
@@ -23,8 +23,7 @@
 
         try
         {
-            StreamInManager.StreamStatusMessage streamStatusMessage = StreamInManager.StreamStatusMessage.serializer().deserialize(new
DataInputStream(bufIn));
-            StreamInManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
+            StreamInManager.StreamStatus streamStatus = StreamInManager.StreamStatus.serializer().deserialize(new
DataInputStream(bufIn));
 
             switch (streamStatus.getAction())
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=904929&r1=904928&r2=904929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
Sun Jan 31 00:23:42 2010
@@ -87,6 +87,14 @@
         {
             return action_;
         }
+
+        public Message makeStreamStatusMessage() throws IOException
+        {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream( bos );
+            StreamStatus.serializer().serialize(this, dos);
+            return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED,
bos.toByteArray());
+        }
     }
     
     public static class StreamStatusSerializer implements ICompactSerializer<StreamStatus>
@@ -117,56 +125,7 @@
             return streamStatus;
         }
     }
-    
-    public static class StreamStatusMessage
-    {
-        private static ICompactSerializer<StreamStatusMessage> serializer_;
-        
-        static 
-        {
-            serializer_ = new StreamStatusMessageSerializer();
-        }
-        
-        public static ICompactSerializer<StreamStatusMessage> serializer()
-        {
-            return serializer_;
-        }
-        
-        public static Message makeStreamStatusMessage(StreamStatusMessage streamStatusMessage)
throws IOException
-        {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            DataOutputStream dos = new DataOutputStream( bos );
-            StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
-            return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED,
bos.toByteArray());
-        }
-        
-        protected StreamInManager.StreamStatus streamStatus_;
-        
-        public StreamStatusMessage(StreamInManager.StreamStatus streamStatus)
-        {
-            streamStatus_ = streamStatus;
-        }
-        
-        public StreamInManager.StreamStatus getStreamStatus()
-        {
-            return streamStatus_;
-        }
-    }
-    
-    public static class StreamStatusMessageSerializer implements ICompactSerializer<StreamStatusMessage>
-    {
-        public void serialize(StreamStatusMessage streamStatusMessage, DataOutputStream dos)
throws IOException
-        {
-            StreamStatus.serializer().serialize(streamStatusMessage.streamStatus_, dos);
           
-        }
-        
-        public StreamStatusMessage deserialize(DataInputStream dis) throws IOException
-        {            
-            StreamInManager.StreamStatus streamStatus = StreamStatus.serializer().deserialize(dis);
-            return new StreamStatusMessage(streamStatus);
-        }
-    }
-        
+                
     /* Maintain a stream context per host that is the source of the stream */
     public static final Map<InetAddress, List<InitiatedFile>> ctxBag_ = new Hashtable<InetAddress,
List<InitiatedFile>>();
     /* Maintain in this map the status of the streams that need to be sent back to the source
*/



Mime
View raw message