Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 46810 invoked from network); 2 Apr 2009 14:04:16 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 2 Apr 2009 14:04:16 -0000 Received: (qmail 16723 invoked by uid 500); 2 Apr 2009 14:04:16 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 16678 invoked by uid 500); 2 Apr 2009 14:04:16 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 16669 invoked by uid 99); 2 Apr 2009 14:04:16 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Apr 2009 14:04:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Apr 2009 14:04:15 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 82847238896D; Thu, 2 Apr 2009 14:03:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r761301 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQConnection.java ActiveMQMessageConsumer.java ActiveMQSession.java Date: Thu, 02 Apr 2009 14:03:55 -0000 To: commits@activemq.apache.org From: dejanb@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090402140355.82847238896D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dejanb Date: Thu Apr 2 14:03:54 2009 New Revision: 761301 URL: http://svn.apache.org/viewvc?rev=761301&view=rev Log: https://issues.apache.org/activemq/browse/AMQ-2195 - receive should throw an exception if the connection is lost Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=761301&r1=761300&r2=761301&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Apr 2 14:03:54 2009 @@ -1411,7 +1411,7 @@ */ public void cleanup() throws JMSException { - if (advisoryConsumer != null) { + if (advisoryConsumer != null && !isTransportFailed()) { advisoryConsumer.dispose(); advisoryConsumer = null; } @@ -1805,7 +1805,11 @@ transportFailed(error); ServiceSupport.dispose(ActiveMQConnection.this.transport); brokerInfoReceived.countDown(); - + try { + cleanup(); + } catch (JMSException e) { + LOG.warn("Exception during connection cleanup, " + e, e); + } for (Iterator iter = transportListeners .iterator(); iter.hasNext();) { TransportListener listener = iter.next(); @@ -2215,4 +2219,8 @@ protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) { connectionAudit.rollbackDuplicate(dispatcher, message); } + + public IOException getFirstFailureError() { + return firstFailureError; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=761301&r1=761300&r2=761301&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Apr 2 14:03:54 2009 @@ -16,6 +16,7 @@ */ package org.apache.activemq; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -32,6 +33,7 @@ import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageListener; import org.apache.activemq.command.ActiveMQDestination; @@ -129,6 +131,8 @@ private MessageAck pendingAck; private long lastDeliveredSequenceId; + + private IOException failureError; /** * Create a MessageConsumer @@ -417,7 +421,11 @@ if (timeout > 0 && !unconsumedMessages.isClosed()) { timeout = Math.max(deadline - System.currentTimeMillis(), 0); } else { - return null; + if (failureError != null) { + throw JMSExceptionSupport.create(failureError); + } else { + return null; + } } } else if (md.getMessage() == null) { return null; @@ -1136,4 +1144,12 @@ return lastDeliveredSequenceId; } + public IOException getFailureError() { + return failureError; + } + + public void setFailureError(IOException failureError) { + this.failureError = failureError; + } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=761301&r1=761300&r2=761301&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Thu Apr 2 14:03:54 2009 @@ -609,6 +609,7 @@ for (Iterator iter = consumers.iterator(); iter.hasNext();) { ActiveMQMessageConsumer consumer = iter.next(); + consumer.setFailureError(connection.getFirstFailureError()); consumer.dispose(); lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); }