Return-Path: Delivered-To: apmail-activemq-users-archive@www.apache.org Received: (qmail 9945 invoked from network); 22 Jan 2008 17:34:42 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 22 Jan 2008 17:34:42 -0000 Received: (qmail 68650 invoked by uid 500); 22 Jan 2008 17:34:27 -0000 Delivered-To: apmail-activemq-users-archive@activemq.apache.org Received: (qmail 68628 invoked by uid 500); 22 Jan 2008 17:34:27 -0000 Mailing-List: contact users-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@activemq.apache.org Delivered-To: mailing list users@activemq.apache.org Received: (qmail 68615 invoked by uid 99); 22 Jan 2008 17:34:26 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Jan 2008 09:34:26 -0800 X-ASF-Spam-Status: No, hits=2.6 required=10.0 tests=DNS_FROM_OPENWHOIS,SPF_HELO_PASS,SPF_PASS,WHOIS_MYPRIVREG X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of lists@nabble.com designates 216.139.236.158 as permitted sender) Received: from [216.139.236.158] (HELO kuber.nabble.com) (216.139.236.158) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Jan 2008 17:34:12 +0000 Received: from isper.nabble.com ([192.168.236.156]) by kuber.nabble.com with esmtp (Exim 4.63) (envelope-from ) id 1JHN0f-00052K-0j for users@activemq.apache.org; Tue, 22 Jan 2008 09:34:05 -0800 Message-ID: <15023988.post@talk.nabble.com> Date: Tue, 22 Jan 2008 09:34:05 -0800 (PST) From: activemqnewbie To: users@activemq.apache.org Subject: Re: Consumer is closed Exception In-Reply-To: <15022223.post@talk.nabble.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit X-Nabble-From: acquaj29@yahoo.co.in References: <15022223.post@talk.nabble.com> X-Virus-Checked: Checked by ClamAV on apache.org Below is the source for MessageConsumer.java package ss.integration.util; import javax.jms.TopicConnectionFactory; import javax.jms.TopicConnection; import javax.jms.Topic; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.jms.MessageListener; import javax.jms.JMSException; import ss.util.logging.Logger; import ss.util.SystemException; import ss.util.JNDILookup; import ss.integration.util.ExListener; public class MessageConsumer { private TopicConnectionFactory _topicConnectionFactory; private TopicConnection _topicConnection; private Topic _topic; private TopicSession _session; private TopicSubscriber _subscriber; private String _strClientID; private String _strTopicConnectionFactory; private String _strTopic; private String _strSelector; private String _strContext; private String _strURL; private int _reconnectRetry; private long _reconnectTime; private boolean _firstTimeConnection; // flag to indicated that a shutdown has been requested private boolean _shutdownRequested; private static ExListener _exListener; private static MessageListener _msgListener; public MessageConsumer(String clientID_, String topicConnectionFactory_, String topic_, String selector_, String context_, String url_) throws SystemException { _strClientID = clientID_; _strTopicConnectionFactory = topicConnectionFactory_; _strTopic = topic_; _strSelector = selector_; _strContext = context_; _strURL = url_; _exListener = null; _msgListener = null; _reconnectRetry = 0; // reconnect is currently not used. _reconnectTime = 1 * 1000; _firstTimeConnection = true; _shutdownRequested = false; } public void init() throws SystemException { try { Logger.trace("MessageConsumer, setting the topicConnection, client: " + _strClientID); _topicConnectionFactory = (TopicConnectionFactory)JNDILookup.lookup(_strTopicConnectionFactory, _strContext, _strURL); _topicConnection = _topicConnectionFactory.createTopicConnection(); _topicConnection.setClientID(_strClientID); _topic = (Topic)JNDILookup.lookup(_strTopic, _strContext, _strURL); _session = _topicConnection.createTopicSession(true, 0); _subscriber = _session.createDurableSubscriber(_topic, _strClientID, _strSelector, true); Logger.trace("_topic=" + _topic + " _strClientID=" + _strClientID + " _strSelector = " + _strSelector); Logger.trace("_subscriber=" + _subscriber); if (_exListener == null) { _exListener = new ExListener(this); } } catch (JMSException je) { Logger.trace("MessageConsumer init() failed: JMSException"); throw new SystemException(je); } catch (SystemException se) { Logger.trace("MessageConsumer init() failed: SystemException"); throw new SystemException(se); } catch (Exception e) { Logger.trace("MessageConsumer init() failed: Exception"); throw new SystemException(e); } } public void start() throws SystemException { synchronized (this) { while (!_shutdownRequested) { try { startConnection(); Logger.trace("Wait()..." ); wait(); Logger.trace("Waiting thread awaken" ); } catch (Exception e) { Logger.error("Error in MessageConsumer.start: " + e.getMessage()); throw new SystemException(e); } finally { close(); } } } } private void signalShutdown() { Logger.trace("Shutting down..."); _shutdownRequested = true; notifyWaitingThread(); } // to signal a waiting thread (waiting inside start() method) public synchronized void notifyWaitingThread() { unregisterListener(); Logger.trace("Notifying/signaling a waiting thread..."); notify(); } public void setListener (MessageListener listener_) { _msgListener = listener_; } private void registerMsgListener() throws SystemException { if (_msgListener != null) { try { Logger.trace("Registering Message Listener..."); // install asynchronous listener _subscriber.setMessageListener(_msgListener); } catch (JMSException je) { throw new SystemException(je); } catch (Exception e) { throw new SystemException(e); } } else { Logger.warning("MessageListener is null"); } } private void unregisterListener() { try { Logger.trace("Unregistering Message Listener..."); _subscriber.setMessageListener(null); } catch (Exception e) { Logger.error(e); } } private void registerExceptionListener() throws SystemException { if (_exListener != null) { try { Logger.trace("Registering JMS Exception Listener..."); // install asynchronous listener _topicConnection.setExceptionListener(_exListener); } catch (JMSException je) { throw new SystemException(je); } catch (Exception e) { throw new SystemException(e); } } else { Logger.warning("ExceptionListener is null"); } } public void doCommit () throws SystemException { try { _session.commit(); } catch (JMSException je) { throw new SystemException(je); } catch (Exception e) { throw new SystemException(e); } } public void doRollback () throws SystemException { try { Logger.trace("doRollback() Rolling-back message..."); _session.rollback(); // Be careful when using this. Sometimes the rollback doesn't work. Logger.trace("doRollback() Message rolled-back"); signalShutdown(); } catch (JMSException je) { Logger.trace("doRollback() JMS Exception: " + je.getMessage()); throw new SystemException(je); } catch (Exception e) { Logger.trace("doRollback() Exception :" + e.getMessage()); throw new SystemException(e); } } public void close() throws SystemException { try { if (_subscriber != null) { Logger.trace("closing subscriber..."); _subscriber.close(); _subscriber = null; Logger.trace("subscriber closed"); } if (_session != null) { Logger.trace("closing session..."); _session.close(); _session = null; Logger.trace("session closed"); } if (_topicConnection != null) { Logger.trace("closing topic connection..."); _topicConnection.close(); _topicConnection = null; Logger.trace("topicConnection closed"); } _topicConnectionFactory = null; _topic = null; } catch (Exception e) { Logger.trace("Exception in close()"); } } private void startConnection() throws SystemException { boolean success = false; for (int i=0; i<_reconnectRetry || _firstTimeConnection; i++) { _firstTimeConnection = false; try { Logger.trace("Closing all connections... "); close(); if (_exListener != null) { wait(_reconnectTime); } Logger.trace("Starting all connections... (" + (i+1) + "/" + _reconnectRetry + ")"); init(); Logger.trace("Init successfull"); registerMsgListener(); Logger.trace("RegisterMsgListener successfull"); registerExceptionListener(); Logger.trace("RegisterExceptionListener successfull"); Logger.trace("Starting TopicConnection..."); _topicConnection.start(); Logger.trace("TopicConnection started..."); success = true; break; } catch (Exception e) { Logger.error("startConnection() failed"); Logger.error(e); } } if (!success) { close(); Logger.error("Unable to connect to the JMS Server"); throw new SystemException("Unable to connect to the JMS Server"); } } } activemqnewbie wrote: > > Subscriber closes with an exception(below)after consuming certain > messages.This happens at different intervals. > Appreciate if someone could let me know the reason and solution for this. > Am using activemq 5.0,java 5. > Is there any property/setting that i need to do ? > Below is the stack trace. > <2008/01/10 10:43:23.127><21621663><> ExceptionListener receives a > JMS exception <2008/01/10 10:43:23.127><21621663><> Unregistering > Message Listener... <2008/01/10 10:43:23.127><21621663><> > Notifying/signaling a waiting thread... <2008/01/10 > 10:43:23.144><21621663><> ExceptionListener receives a JMS exception > <2008/01/10 10:43:23.144><21621663><> Unregistering Message > Listener... <2008/01/10 10:43:23.153><21621663><> > javax.jms.IllegalStateException: The Consumer is closed at > org.apache.activemq.ActiveMQMessageConsumer.checkClosed(ActiveMQMessageConsumer.java:679) > at > org.apache.activemq.ActiveMQMessageConsumer.setMessageListener(ActiveMQMessageConsumer.java:352) > > > Thanks > > -- View this message in context: http://www.nabble.com/Consumer-is-closed-Exception-tp15022223s2354p15023988.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.