qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1644128 - /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
Date Tue, 09 Dec 2014 17:36:34 GMT
Author: kwall
Date: Tue Dec  9 17:36:33 2014
New Revision: 1644128

URL: http://svn.apache.org/r1644128
Log:
Stop reading bytes from the wire once the transport is closed.  Also prevent further ByteBuffers
being queued to be sent once the transport is closed too

Modified:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1644128&r1=1644127&r2=1644128&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
Tue Dec  9 17:36:33 2014
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderClosedException;
 import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.network.Ticker;
 
@@ -104,6 +105,10 @@ public class NonBlockingSenderReceiver
     @Override
     public void send(final ByteBuffer msg)
     {
+        if (_closed.get())
+        {
+            throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + "
is already closed");
+        }
         // append to list and do selector wakeup
         _buffers.add(msg);
         _selector.wakeup();
@@ -223,8 +228,8 @@ public class NonBlockingSenderReceiver
     private void doRead() throws IOException
     {
 
-        int remaining;
-        do
+        int remaining = 0;
+        while (remaining == 0 && !_closed.get())
         {
             if(_currentBuffer == null || _currentBuffer.remaining() == 0)
             {
@@ -241,7 +246,5 @@ public class NonBlockingSenderReceiver
             _currentBuffer = _currentBuffer.slice();
             _receiver.received(dup);
         }
-        while (remaining == 0);
-
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message