cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [1/3] git commit: Fix NPE when streaming connection is not yet ready
Date Tue, 28 Jan 2014 15:20:38 GMT
Updated Branches:
  refs/heads/cassandra-2.0 20c2adc87 -> 41ffca128
  refs/heads/trunk 025474177 -> 9c9552aea


Fix NPE when streaming connection is not yet ready

patch by yukim; reviewed by Russell Spitzer for CASSANDRA-6210


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/41ffca12
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/41ffca12
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/41ffca12

Branch: refs/heads/cassandra-2.0
Commit: 41ffca1281dcdc69b1b843b47a5bb6dc3c462aac
Parents: 20c2adc
Author: Yuki Morishita <yukim@apache.org>
Authored: Tue Jan 28 09:17:30 2014 -0600
Committer: Yuki Morishita <yukim@apache.org>
Committed: Tue Jan 28 09:17:30 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/ConnectionHandler.java  | 117 +++++++++----------
 .../streaming/messages/StreamMessage.java       |   3 +-
 3 files changed, 61 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/41ffca12/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 68727dc..46b14fc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
  * Fix LOCAL_SERIAL from thrift (CASSANDRA-6584)
  * Don't special case received counts in CAS timeout exceptions (CASSANDRA-6595)
  * Add support for 2.1 global counter shards (CASSANDRA-6505)
+ * Fix NPE when streaming connection is not yet established (CASSANDRA-6210)
 Merged from 1.2:
  * fsync compression metadata (CASSANDRA-6531)
  * Validate CF existence on execution for prepared statement (CASSANDRA-6535)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41ffca12/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 57f76a7..356138b 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
+import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
@@ -64,6 +65,8 @@ public class ConnectionHandler
     ConnectionHandler(StreamSession session)
     {
         this.session = session;
+        this.incoming = new IncomingMessageHandler(session);
+        this.outgoing = new OutgoingMessageHandler(session);
     }
 
     /**
@@ -77,15 +80,13 @@ public class ConnectionHandler
     {
         logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
         Socket incomingSocket = connect(session.peer);
-        incoming = new IncomingMessageHandler(session, incomingSocket, StreamMessage.CURRENT_VERSION);
-        incoming.sendInitMessage(true);
-        incoming.start();
+        incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
+        incoming.sendInitMessage(incomingSocket, true);
 
         logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
         Socket outgoingSocket = connect(session.peer);
-        outgoing = new OutgoingMessageHandler(session, outgoingSocket, StreamMessage.CURRENT_VERSION);
-        outgoing.sendInitMessage(false);
-        outgoing.start();
+        outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
+        outgoing.sendInitMessage(outgoingSocket, false);
     }
 
     /**
@@ -98,15 +99,9 @@ public class ConnectionHandler
     public void initiateOnReceivingSide(Socket socket, boolean isForOutgoing, int version)
throws IOException
     {
         if (isForOutgoing)
-        {
-            outgoing = new OutgoingMessageHandler(session, socket, version);
-            outgoing.start();
-        }
+            outgoing.start(socket, version);
         else
-        {
-            incoming = new IncomingMessageHandler(session, socket, version);
-            incoming.start();
-        }
+            incoming.start(socket, version);
     }
 
     /**
@@ -189,21 +184,19 @@ public class ConnectionHandler
     {
         protected final StreamSession session;
 
-        protected final Socket socket;
-        protected final int protocolVersion;
+        protected int protocolVersion;
+        protected Socket socket;
 
         private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
 
-        protected MessageHandler(StreamSession session, Socket socket, int protocolVersion)
+        protected MessageHandler(StreamSession session)
         {
             this.session = session;
-            this.socket = socket;
-            this.protocolVersion = protocolVersion;
         }
 
         protected abstract String name();
 
-        protected WritableByteChannel getWriteChannel() throws IOException
+        protected static WritableByteChannel getWriteChannel(Socket socket) throws IOException
         {
             WritableByteChannel out = socket.getChannel();
             // socket channel is null when encrypted(SSL)
@@ -212,7 +205,7 @@ public class ConnectionHandler
                  : out;
         }
 
-        protected ReadableByteChannel getReadChannel() throws IOException
+        protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException
         {
             ReadableByteChannel in = socket.getChannel();
             // socket channel is null when encrypted(SSL)
@@ -221,14 +214,19 @@ public class ConnectionHandler
                  : in;
         }
 
-        public void sendInitMessage(boolean isForOutgoing) throws IOException
+        public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
         {
             StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(),
session.planId(), session.description(), isForOutgoing);
-            getWriteChannel().write(message.createMessage(false, protocolVersion));
+            ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
+            while (messageBuf.hasRemaining())
+                getWriteChannel(socket).write(messageBuf);
         }
 
-        public void start()
+        public void start(Socket socket, int protocolVersion)
         {
+            this.socket = socket;
+            this.protocolVersion = protocolVersion;
+
             new Thread(this, name() + "-" + session.peer).start();
         }
 
@@ -264,12 +262,9 @@ public class ConnectionHandler
      */
     static class IncomingMessageHandler extends MessageHandler
     {
-        private final ReadableByteChannel in;
-
-        IncomingMessageHandler(StreamSession session, Socket socket, int protocolVersion)
throws IOException
+        IncomingMessageHandler(StreamSession session)
         {
-            super(session, socket, protocolVersion);
-            this.in = getReadChannel();
+            super(session);
         }
 
         protected String name()
@@ -279,9 +274,10 @@ public class ConnectionHandler
 
         public void run()
         {
-            while (!isClosed())
+            try
             {
-                try
+                ReadableByteChannel in = getReadChannel(socket);
+                while (!isClosed())
                 {
                     // receive message
                     StreamMessage message = StreamMessage.deserialize(in, protocolVersion,
session);
@@ -293,17 +289,20 @@ public class ConnectionHandler
                         session.messageReceived(message);
                     }
                 }
-                catch (SocketException e)
-                {
-                    // socket is closed
-                    close();
-                }
-                catch (Throwable e)
-                {
-                    session.onError(e);
-                }
             }
-            signalCloseDone();
+            catch (SocketException e)
+            {
+                // socket is closed
+                close();
+            }
+            catch (Throwable e)
+            {
+                session.onError(e);
+            }
+            finally
+            {
+                signalCloseDone();
+            }
         }
     }
 
@@ -326,12 +325,9 @@ public class ConnectionHandler
             }
         });
 
-        private final WritableByteChannel out;
-
-        OutgoingMessageHandler(StreamSession session, Socket socket, int protocolVersion)
throws IOException
+        OutgoingMessageHandler(StreamSession session)
         {
-            super(session, socket, protocolVersion);
-            this.out = getWriteChannel();
+            super(session);
         }
 
         protected String name()
@@ -346,30 +342,33 @@ public class ConnectionHandler
 
         public void run()
         {
-            StreamMessage next;
-            while (!isClosed())
+            try
             {
-                try
+                WritableByteChannel out = getWriteChannel(socket);
+
+                StreamMessage next;
+                while (!isClosed())
                 {
                     if ((next = messageQueue.poll(1, TimeUnit.SECONDS)) != null)
                     {
                         logger.debug("[Stream #{}] Sending {}", session.planId(), next);
-                        sendMessage(next);
+                        sendMessage(out, next);
                         if (next.type == StreamMessage.Type.SESSION_FAILED)
                             close();
                     }
                 }
-                catch (InterruptedException e)
-                {
-                    throw new AssertionError(e);
-                }
-            }
 
-            try
-            {
                 // Sends the last messages on the queue
                 while ((next = messageQueue.poll()) != null)
-                    sendMessage(next);
+                    sendMessage(out, next);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+            catch (IOException e)
+            {
+                session.onError(e);
             }
             finally
             {
@@ -377,7 +376,7 @@ public class ConnectionHandler
             }
         }
 
-        private void sendMessage(StreamMessage message)
+        private void sendMessage(WritableByteChannel out, StreamMessage message)
         {
             try
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41ffca12/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 2e7341b..9e146e8 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -40,7 +40,8 @@ public abstract class StreamMessage
         // message type
         buff.put(message.type.type);
         buff.flip();
-        out.write(buff);
+        while (buff.hasRemaining())
+            out.write(buff);
         message.type.serializer.serialize(message, out, version, session);
     }
 


Mime
View raw message