Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E7666101D5 for ; Wed, 20 Nov 2013 07:00:30 +0000 (UTC) Received: (qmail 68881 invoked by uid 500); 20 Nov 2013 07:00:26 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 68764 invoked by uid 500); 20 Nov 2013 07:00:14 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 68592 invoked by uid 99); 20 Nov 2013 07:00:06 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Nov 2013 07:00:06 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Nov 2013 07:00:05 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 619462388A3D; Wed, 20 Nov 2013 06:59:45 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1543721 - /qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Date: Wed, 20 Nov 2013 06:59:45 -0000 To: commits@qpid.apache.org From: kwall@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131120065945.619462388A3D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kwall Date: Wed Nov 20 06:59:44 2013 New Revision: 1543721 URL: http://svn.apache.org/r1543721 Log: QPID-5282: Change IoSender to cause the socket to be closed after a sender timeout IoSender#send now causes the socket to be closed in response to a sender timeout (in addition to the SenderException). Note that this code path avoids the close causing the sender thread join (as this would most likely timeout too). Also improved log/exception messages to include the remote socket address (to aid problem diagnosis). Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=1543721&r1=1543720&r2=1543721&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Wed Nov 20 06:59:44 2013 @@ -60,6 +60,7 @@ public final class IoSender implements R private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread senderThread; private final List _listeners = new ArrayList(); + private final String _remoteSocketAddress; private volatile Throwable exception = null; @@ -68,6 +69,7 @@ public final class IoSender implements R this.socket = socket; this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2 this.timeout = timeout; + _remoteSocketAddress = socket.getRemoteSocketAddress().toString(); try { @@ -89,7 +91,7 @@ public final class IoSender implements R } senderThread.setDaemon(true); - senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); + senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress)); } public void initiate() @@ -109,13 +111,11 @@ public final class IoSender implements R public void send(ByteBuffer buf) { - if (closed.get()) - { - throw new SenderClosedException("sender is closed", exception); - } + checkNotAlreadyClosed(); + if(!senderThread.isAlive()) { - throw new SenderException("sender thread not alive"); + throw new SenderException(String.format("sender thread for socket %s is not alive", _remoteSocketAddress)); } final int size = buffer.length; @@ -131,7 +131,7 @@ public final class IoSender implements R flush(); synchronized (notFull) { - long start = System.currentTimeMillis(); + final long start = System.currentTimeMillis(); long elapsed = 0; while (!closed.get() && head - tail >= size && elapsed < timeout) { @@ -146,14 +146,19 @@ public final class IoSender implements R elapsed += System.currentTimeMillis() - start; } - if (closed.get()) - { - throw new SenderClosedException("sender is closed", exception); - } + checkNotAlreadyClosed(); if (head - tail >= size) { - throw new SenderException(String.format("write timed out: %s, %s", head, tail)); + try + { + log.error("write timed out for socket %s: head %d, tail %d", _remoteSocketAddress, head, tail); + throw new SenderException(String.format("write timed out for socket %s: head %d, tail %d", _remoteSocketAddress, head, tail)); + } + finally + { + close(false, false); + } } } continue; @@ -191,10 +196,10 @@ public final class IoSender implements R public void close() { - close(true); + close(true, true); } - void close(boolean reportException) + private void close(boolean awaitSenderBeforeClose, boolean reportException) { if (!closed.getAndSet(true)) { @@ -210,21 +215,11 @@ public final class IoSender implements R try { - if (Thread.currentThread() != senderThread) + if (awaitSenderBeforeClose) { - senderThread.join(timeout); - if (senderThread.isAlive()) - { - log.error("join timed out"); - throw new SenderException("join timed out"); - } + awaitSenderThreadShutdown(); } } - catch (InterruptedException e) - { - log.error("interrupted whilst waiting for sender thread to stop"); - throw new SenderException(e); - } finally { closeListeners(); @@ -247,7 +242,7 @@ public final class IoSender implements R } catch(Exception e) { - log.error("Exception closing listener: " + e.getMessage()); + log.error(e, "Exception closing listener for socket %s", _remoteSocketAddress); ex = e; } } @@ -316,7 +311,7 @@ public final class IoSender implements R { log.error(e, "error in write thread"); exception = e; - close(false); + close(false, false); break; } tail += length; @@ -346,4 +341,33 @@ public final class IoSender implements R { _listeners.add(listener); } + + private void awaitSenderThreadShutdown() + { + if (Thread.currentThread() != senderThread) + { + try + { + senderThread.join(timeout); + if (senderThread.isAlive()) + { + log.error("join timed out for socket %s to stop", _remoteSocketAddress); + throw new SenderException(String.format("join timed out for socket %s to stop", _remoteSocketAddress)); + } + } + catch (InterruptedException e) + { + log.error("interrupted whilst waiting for sender thread for socket %s to stop", _remoteSocketAddress); + throw new SenderException(e); + } + } + } + + private void checkNotAlreadyClosed() + { + if (closed.get()) + { + throw new SenderClosedException(String.format("sender for socket %s is closed", _remoteSocketAddress), exception); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org