activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jose Martinez <jmarti...@opencrowd.com>
Subject Re: Embedded Broker/Listener becomes inactive and doesn't dequeue messages
Date Tue, 25 Dec 2012 20:02:41 GMT
Thanks. I'll have to look into how to get the JConsole screenshot as I've
never done it before. Do you think there might be something wrong with the
my listener? I suspect the broker is working fine but the listener is the
one that is dying. I'll try to debug with the JConsole. Thanks!


On Tue, Dec 25, 2012 at 2:49 PM, Raul Kripalani <raul@evosent.com> wrote:

> Can you attach a JConsole screenshot of the Broker MBean attributes?
>
> Does this happen with just one destination or with all?
>
> The Inactivity Monitor WriteChecker is doing exactly what you pursue:
> keeping the connection alive by sending pings back and forth.
>
> It doesn't report errors, so it looks like the connections are okay. The
> Broker JMX screenshot may tell us what's going on.
>
> Raúl.
> On 25 Dec 2012 19:33, "Jose Martinez" <jmartinez@opencrowd.com> wrote:
>
> > Seems like i keep having the issue even after increasing the limit. I
> > wonder what does the 'WriteChecker" message really means. And why can't
> the
> > connection remain stable? Is there a way to automatically recycle the
> > connection to keep it "fresh"?
> >
> > Any suggestions appreciated.
> >
> >
> > On Mon, Dec 24, 2012 at 1:01 PM, Jose Martinez <jmartinez@opencrowd.com
> > >wrote:
> >
> > > Thanks!
> > >
> > > I'll set it as follows:
> > >     broker.getSystemUsage().getTempUsage().setLimit(1024l*64l);
> > >
> > > And see how it goes.
> > >
> > >
> > >
> > > On Mon, Dec 24, 2012 at 12:03 PM, Raul Kripalani <raul@evosent.com>
> > wrote:
> > >
> > >> You are setting a tempUsage limit of 2 kilobytes. Maybe you expected
> > 2000
> > >> to mean megabytes, but the unit is bytes. See Javadoc [1].
> > >>
> > >> In a non-persistent broker, the role of tempUsage is to buffer up
> > >> non-persistent messages when consumers can't keep up. The temp storage
> > >> implementation (PListStore) uses a default journal size of 32mb. So
> your
> > >> system locks up as soon as it lazy inits the PListStore for the first
> > >> time,
> > >> because tempUsage < default journal file size.
> > >>
> > >> My two cents. Please let us know if this helped.
> > >>
> > >> Thanks,
> > >>
> > >> [1]
> > >>
> > >>
> >
> http://activemq.apache.org/maven/5.7.0/activemq-core/apidocs/org/apache/activemq/usage/Usage.html#setLimit(long)
> > >> .
> > >>
> > >> *Raúl Kripalani*
> > >> Apache Camel Committer
> > >> Enterprise Architect, Program Manager, Open Source Integration
> > specialist
> > >> http://about.me/raulkripalani |
> > http://www.linkedin.com/in/raulkripalani
> > >> http://blog.raulkr.net | twitter: @raulvk <http://twitter.com/raulvk>
> > >>
> > >> On Mon, Dec 24, 2012 at 4:50 PM, Jose Martinez <
> jmartinez@opencrowd.com
> > >> >wrote:
> > >>
> > >> > Hello,
> > >> >
> > >> > I've implemented an embedded activemq service and a message listener
> > >> using
> > >> > in activemq 5.7.0. It works fine for a few hours but I've noticed
> that
> > >> > after a day or two of inactivity messages stop getting consumed and
> I
> > >> keep
> > >> > getting the following message in the logs:
> > >> >
> > >> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000
ms
> > >> > elapsed since last write check.
> > >> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] Running
> > >> > WriteCheck[tcp://my.ip.addr:61616]
> > >> > 16:19:11,624 DEBUG [AbstractInactivityMonitor] WriteChecker 10000
ms
> > >> > elapsed since last write check.
> > >> > 16:19:11,625 DEBUG [AbstractInactivityMonitor] Running
> > >> > WriteCheck[tcp://my.ip.addr:61616]
> > >> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000
ms
> > >> > elapsed since last write check.
> > >> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] Running
> > >> > WriteCheck[tcp://my.ip.addr:61616]
> > >> >
> > >> >
> > >> > It looks like my connection becomes inactive after awhile and
> messages
> > >> get
> > >> > stuck in the queue. I can see in netstat the several ports
> > >> > remain occupied but messages are not dequeued.
> > >> >
> > >> > Here's what my embedded service looks like (implemented as a servlet
> > in
> > >> > tomcat):
> > >> >
> > >> > public void init(ServletConfig config) throws ServletException {
> > >> > super.init(config);
> > >> >  logger.info("Initializing message queue listener");
> > >> >  try {
> > >> > BrokerService broker = new BrokerService();
> > >> >
> > >> > TransportConnector connector = new TransportConnector();
> > >> > connector.setUri(new URI("tcp://localhost:61616"));
> > >> > broker.addConnector(connector);
> > >> > broker.setPersistent(false);
> > >> > broker.getSystemUsage().getTempUsage().setLimit(2000);
> > >> > broker.start();
> > >> > EntityXMLConsumer.startInstance(); /this is the listener
> > >> > } catch (Exception e) {
> > >> > logger.error(e);
> > >> > }
> > >> > }
> > >> >
> > >> >
> > >> > And here's my listener.
> > >> >
> > >> > public class EntityXMLConsumer implements MessageListener,
> > >> > ExceptionListener{
> > >> >  private static Logger logger =
> > >> Logger.getLogger(EntityXMLConsumer.class);
> > >> >  private static ConfigManager configManager =
> > >> ConfigManager.getInstance();
> > >> > private static String url =
> > >> > configManager.getProperty("popworkflowsvc.activemq.url");
> > >> > //private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
> > >> >     private static String subject = "WORKFLOW";
> > >> >     private static Session session;
> > >> >     private MessageConsumer consumer;
> > >> >     private static Connection connection;
> > >> >     private static EntityXMLConsumer instance = null;
> > >> >
> > >> > private EntityXMLConsumer() throws JMSException{
> > >> > logger.info("Initializing message listener on url :" + url);
> > >> > ActiveMQConnectionFactory connectionFactory = new
> > >> > ActiveMQConnectionFactory(url);
> > >> >                 connection = connectionFactory.createConnection();
> > >> >                // to use transactions you should set the first
> > >> parameter to
> > >> > 'true'
> > >> >                session = connection.createSession(false,
> > >> > Session.AUTO_ACKNOWLEDGE);
> > >> >                Destination destination =
> session.createQueue(subject);
> > >> >                consumer = session.createConsumer(destination);
> > >> >
> > >> >         consumer.setMessageListener(this);
> > >> >             connection.start();
> > >> > }
> > >> >  public static EntityXMLConsumer startInstance() throws
> JMSException {
> > >> >  if (instance == null){
> > >> > instance = new EntityXMLConsumer();
> > >> > }
> > >> >  return instance;
> > >> > }
> > >> >  public static void closeConnection() {
> > >> >         try {
> > >> >          logger.info("closing connection");
> > >> > connection.close();
> > >> > } catch (JMSException e) {
> > >> > logger.error(e);
> > >> > }
> > >> > }
> > >> >
> > >> >
> > >> > public void onException(JMSException ex) {
> > >> > logger.error(ex);
> > >> > closeConnection();
> > >> > }
> > >> >
> > >> >
> > >> > public void onMessage(Message message) {
> > >> >         final String MSG_ID = "ENTITY_DATA";
> > >> >         PopWorkflowServiceSEI service = new
> PopWorkflowServiceImpl();
> > >> >
> > >> >     try {
> > >> > logger.info("message received: " + message );
> > >> > if (message instanceof MapMessage){
> > >> > MapMessage mapMessage = (MapMessage) message;
> > >> > Map payloadMap = (Map)mapMessage.getObject(MSG_ID);
> > >> > for (Object messageKey: payloadMap.keySet()){
> > >> > logger.info("Module ID: " + messageKey);
> > >> > Object[] entityXML = ((List)(payloadMap.get(messageKey))).toArray();
> > >> > String[] strEntityXML = Arrays.copyOf(entityXML, entityXML.length,
> > >> > String[].class);
> > >> > service.initWorkflow(strEntityXML , (String)messageKey);
> > >> > logger.debug("Entity XML " + payloadMap.get(messageKey));
> > >> > }
> > >> > }
> > >> >      } catch (JMSException e) {
> > >> >      logger.error(e);
> > >> >      closeConnection();
> > >> > }
> > >> > }
> > >> > }
> > >> >
> > >> >
> > >> > So it looks like either the listener or the broker stop responding
> and
> > >> > messages get stuck in the queue. Are there any additional
> parameters I
> > >> need
> > >> > to configure? Any pointers appreciated.
> > >> >
> > >> >
> > >> > Thanks for your help.
> > >> >
> > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message