Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 15493 invoked from network); 15 Dec 2008 19:51:37 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 15 Dec 2008 19:51:37 -0000 Received: (qmail 84127 invoked by uid 500); 15 Dec 2008 19:51:49 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 84106 invoked by uid 500); 15 Dec 2008 19:51:49 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 84097 invoked by uid 99); 15 Dec 2008 19:51:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Dec 2008 11:51:49 -0800 X-ASF-Spam-Status: No, hits=-1999.3 required=10.0 tests=ALL_TRUSTED,FRT_LEVITRA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Dec 2008 19:51:36 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 6FE6723888A4; Mon, 15 Dec 2008 11:51:16 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r726785 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Date: Mon, 15 Dec 2008 19:51:16 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081215195116.6FE6723888A4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Mon Dec 15 11:51:16 2008 New Revision: 726785 URL: http://svn.apache.org/viewvc?rev=726785&view=rev Log: resolve AMQ-2006 - patch applied with thanks Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java?rev=726785&r1=726784&r2=726785&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Mon Dec 15 11:51:16 2008 @@ -18,6 +18,7 @@ package org.apache.activemq.transport; import java.io.IOException; +import java.net.Socket; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -25,7 +26,8 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.activemq.transport.tcp.TcpBufferedOutputStream; -import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * This filter implements write timeouts for socket write operations. @@ -52,13 +54,16 @@ * */ public class WriteTimeoutFilter extends TransportFilter { - + + private static final Log LOG = LogFactory.getLog(WriteTimeoutFilter.class); protected static ConcurrentLinkedQueue writers = new ConcurrentLinkedQueue(); protected static AtomicInteger messageCounter = new AtomicInteger(0); protected static TimeoutThread timeoutThread = new TimeoutThread(); - protected long writeTimeout = -1; - + protected static long sleep = 5000l; + + protected long writeTimeout = -1; + public WriteTimeoutFilter(Transport next) { super(next); } @@ -69,35 +74,56 @@ registerWrite(this); super.oneway(command); } catch (IOException x) { - deRegisterWrite(this,true,x); throw x; } finally { deRegisterWrite(this,false,null); } } - public long getWriteTimeout() { - return writeTimeout; - } - - public void setWriteTimeout(long writeTimeout) { - this.writeTimeout = writeTimeout; - } - - protected TcpBufferedOutputStream getWriter() { - return next.narrow(TcpBufferedOutputStream.class); - } - - protected static void registerWrite(WriteTimeoutFilter filter) { - writers.add(filter); - } - + public long getWriteTimeout() { + return writeTimeout; + } + + public void setWriteTimeout(long writeTimeout) { + this.writeTimeout = writeTimeout; + } + + public static long getSleep() { + return sleep; + } + + public static void setSleep(long sleep) { + WriteTimeoutFilter.sleep = sleep; + } + + + protected TcpBufferedOutputStream getWriter() { + return next.narrow(TcpBufferedOutputStream.class); + } + + protected Socket getSocket() { + return next.narrow(Socket.class); + } + + protected static void registerWrite(WriteTimeoutFilter filter) { + writers.add(filter); + } + protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) { boolean result = writers.remove(filter); if (result) { if (fail) { - IOException ex = (iox!=null)?iox:new IOException("Forced write timeout for:"+filter.getNext().getRemoteAddress()); - filter.getTransportListener().onException(ex); + String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress(); + LOG.warn(message); + Socket sock = filter.getSocket(); + if (sock==null) { + LOG.error("Destination socket is null, unable to close socket.("+message+")"); + } else { + try { + sock.close(); + }catch (IOException ignore) { + } + } } } return result; @@ -126,22 +152,31 @@ public void run() { while (run) { - if (!interrupted()) { - Iterator filters = writers.iterator(); - while (run && filters.hasNext()) { - WriteTimeoutFilter filter = filters.next(); - if (filter.getWriteTimeout()<=0) continue; //no timeout set - long writeStart = filter.getWriter().getWriteTimestamp(); - long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1; - if (delta>filter.getWriteTimeout()) { - WriteTimeoutFilter.deRegisterWrite(filter, true,null); - }//if timeout - }//while - }//if interrupted + boolean error = false; try { - Thread.sleep(5000); - } catch (InterruptedException x) { - //do nothing + if (!interrupted()) { + Iterator filters = writers.iterator(); + while (run && filters.hasNext()) { + WriteTimeoutFilter filter = filters.next(); + if (filter.getWriteTimeout()<=0) continue; //no timeout set + long writeStart = filter.getWriter().getWriteTimestamp(); + long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1; + if (delta>filter.getWriteTimeout()) { + WriteTimeoutFilter.deRegisterWrite(filter, true,null); + }//if timeout + }//while + }//if interrupted + try { + Thread.sleep(getSleep()); + error = false; + } catch (InterruptedException x) { + //do nothing + } + }catch (Throwable t) { //make sure this thread never dies + if (!error) { //use error flag to avoid filling up the logs + LOG.error("WriteTimeout thread unable validate existing sockets.",t); + error = true; + } } } }