qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1543721 - /qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
Date Wed, 20 Nov 2013 06:59:45 GMT
Author: kwall
Date: Wed Nov 20 06:59:44 2013
New Revision: 1543721

URL: http://svn.apache.org/r1543721
Log:
QPID-5282: Change IoSender to cause the socket to be closed after a sender timeout

IoSender#send now causes the socket to be closed in response to a sender timeout (in addition
to the SenderException).
Note that this code path avoids the close causing the  sender thread join (as this would most
likely timeout too). Also
improved log/exception messages to include the remote socket address (to aid problem diagnosis).

Modified:
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=1543721&r1=1543720&r2=1543721&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
Wed Nov 20 06:59:44 2013
@@ -60,6 +60,7 @@ public final class IoSender implements R
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final Thread senderThread;
     private final List<Closeable> _listeners = new ArrayList<Closeable>();
+    private final String _remoteSocketAddress;
 
     private volatile Throwable exception = null;
 
@@ -68,6 +69,7 @@ public final class IoSender implements R
         this.socket = socket;
         this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
         this.timeout = timeout;
+        _remoteSocketAddress = socket.getRemoteSocketAddress().toString();
 
         try
         {
@@ -89,7 +91,7 @@ public final class IoSender implements R
         }
 
         senderThread.setDaemon(true);
-        senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
+        senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress));
     }
 
     public void initiate()
@@ -109,13 +111,11 @@ public final class IoSender implements R
 
     public void send(ByteBuffer buf)
     {
-        if (closed.get())
-        {
-            throw new SenderClosedException("sender is closed", exception);
-        }
+        checkNotAlreadyClosed();
+
         if(!senderThread.isAlive())
         {
-            throw new SenderException("sender thread not alive");
+            throw new SenderException(String.format("sender thread for socket %s is not alive",
_remoteSocketAddress));
         }
 
         final int size = buffer.length;
@@ -131,7 +131,7 @@ public final class IoSender implements R
                 flush();
                 synchronized (notFull)
                 {
-                    long start = System.currentTimeMillis();
+                    final long start = System.currentTimeMillis();
                     long elapsed = 0;
                     while (!closed.get() && head - tail >= size && elapsed
< timeout)
                     {
@@ -146,14 +146,19 @@ public final class IoSender implements R
                         elapsed += System.currentTimeMillis() - start;
                     }
 
-                    if (closed.get())
-                    {
-                        throw new SenderClosedException("sender is closed", exception);
-                    }
+                    checkNotAlreadyClosed();
 
                     if (head - tail >= size)
                     {
-                        throw new SenderException(String.format("write timed out: %s, %s",
head, tail));
+                        try
+                        {
+                            log.error("write timed out for socket %s: head %d, tail %d",
_remoteSocketAddress, head, tail);
+                            throw new SenderException(String.format("write timed out for
socket %s: head %d, tail %d",  _remoteSocketAddress, head, tail));
+                        }
+                        finally
+                        {
+                            close(false, false);
+                        }
                     }
                 }
                 continue;
@@ -191,10 +196,10 @@ public final class IoSender implements R
 
     public void close()
     {
-        close(true);
+        close(true, true);
     }
 
-    void close(boolean reportException)
+    private void close(boolean awaitSenderBeforeClose, boolean reportException)
     {
         if (!closed.getAndSet(true))
         {
@@ -210,21 +215,11 @@ public final class IoSender implements R
 
             try
             {
-                if (Thread.currentThread() != senderThread)
+                if (awaitSenderBeforeClose)
                 {
-                    senderThread.join(timeout);
-                    if (senderThread.isAlive())
-                    {
-                        log.error("join timed out");
-                        throw new SenderException("join timed out");
-                    }
+                    awaitSenderThreadShutdown();
                 }
             }
-            catch (InterruptedException e)
-            {
-                log.error("interrupted whilst waiting for sender thread to stop");
-                throw new SenderException(e);
-            }
             finally
             {
                 closeListeners();
@@ -247,7 +242,7 @@ public final class IoSender implements R
             }
             catch(Exception e)
             {
-                log.error("Exception closing listener: " + e.getMessage());
+                log.error(e, "Exception closing listener for socket %s", _remoteSocketAddress);
                 ex = e;
             }
         }
@@ -316,7 +311,7 @@ public final class IoSender implements R
             {
                 log.error(e, "error in write thread");
                 exception = e;
-                close(false);
+                close(false, false);
                 break;
             }
             tail += length;
@@ -346,4 +341,33 @@ public final class IoSender implements R
     {
         _listeners.add(listener);
     }
+
+    private void awaitSenderThreadShutdown()
+    {
+        if (Thread.currentThread() != senderThread)
+        {
+            try
+            {
+                senderThread.join(timeout);
+                if (senderThread.isAlive())
+                {
+                    log.error("join timed out for socket %s to stop", _remoteSocketAddress);
+                    throw new SenderException(String.format("join timed out for socket %s
to stop", _remoteSocketAddress));
+                }
+            }
+            catch (InterruptedException e)
+            {
+                log.error("interrupted whilst waiting for sender thread for socket %s to
stop", _remoteSocketAddress);
+                throw new SenderException(e);
+            }
+        }
+    }
+
+    private void checkNotAlreadyClosed()
+    {
+        if (closed.get())
+        {
+            throw new SenderClosedException(String.format("sender for socket %s is closed",
_remoteSocketAddress), exception);
+        }
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message