activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jose Martinez <jmarti...@opencrowd.com>
Subject Re: Embedded Broker/Listener becomes inactive and doesn't dequeue messages
Date Mon, 24 Dec 2012 18:01:32 GMT
Thanks!

I'll set it as follows:
    broker.getSystemUsage().getTempUsage().setLimit(1024l*64l);

And see how it goes.



On Mon, Dec 24, 2012 at 12:03 PM, Raul Kripalani <raul@evosent.com> wrote:

> You are setting a tempUsage limit of 2 kilobytes. Maybe you expected 2000
> to mean megabytes, but the unit is bytes. See Javadoc [1].
>
> In a non-persistent broker, the role of tempUsage is to buffer up
> non-persistent messages when consumers can't keep up. The temp storage
> implementation (PListStore) uses a default journal size of 32mb. So your
> system locks up as soon as it lazy inits the PListStore for the first time,
> because tempUsage < default journal file size.
>
> My two cents. Please let us know if this helped.
>
> Thanks,
>
> [1]
>
> http://activemq.apache.org/maven/5.7.0/activemq-core/apidocs/org/apache/activemq/usage/Usage.html#setLimit(long)
> .
>
> *Raúl Kripalani*
> Apache Camel Committer
> Enterprise Architect, Program Manager, Open Source Integration specialist
> http://about.me/raulkripalani | http://www.linkedin.com/in/raulkripalani
> http://blog.raulkr.net | twitter: @raulvk <http://twitter.com/raulvk>
>
> On Mon, Dec 24, 2012 at 4:50 PM, Jose Martinez <jmartinez@opencrowd.com
> >wrote:
>
> > Hello,
> >
> > I've implemented an embedded activemq service and a message listener
> using
> > in activemq 5.7.0. It works fine for a few hours but I've noticed that
> > after a day or two of inactivity messages stop getting consumed and I
> keep
> > getting the following message in the logs:
> >
> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> > elapsed since last write check.
> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] Running
> > WriteCheck[tcp://my.ip.addr:61616]
> > 16:19:11,624 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> > elapsed since last write check.
> > 16:19:11,625 DEBUG [AbstractInactivityMonitor] Running
> > WriteCheck[tcp://my.ip.addr:61616]
> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> > elapsed since last write check.
> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] Running
> > WriteCheck[tcp://my.ip.addr:61616]
> >
> >
> > It looks like my connection becomes inactive after awhile and messages
> get
> > stuck in the queue. I can see in netstat the several ports
> > remain occupied but messages are not dequeued.
> >
> > Here's what my embedded service looks like (implemented as a servlet in
> > tomcat):
> >
> > public void init(ServletConfig config) throws ServletException {
> > super.init(config);
> >  logger.info("Initializing message queue listener");
> >  try {
> > BrokerService broker = new BrokerService();
> >
> > TransportConnector connector = new TransportConnector();
> > connector.setUri(new URI("tcp://localhost:61616"));
> > broker.addConnector(connector);
> > broker.setPersistent(false);
> > broker.getSystemUsage().getTempUsage().setLimit(2000);
> > broker.start();
> > EntityXMLConsumer.startInstance(); /this is the listener
> > } catch (Exception e) {
> > logger.error(e);
> > }
> > }
> >
> >
> > And here's my listener.
> >
> > public class EntityXMLConsumer implements MessageListener,
> > ExceptionListener{
> >  private static Logger logger =
> Logger.getLogger(EntityXMLConsumer.class);
> >  private static ConfigManager configManager =
> ConfigManager.getInstance();
> > private static String url =
> > configManager.getProperty("popworkflowsvc.activemq.url");
> > //private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
> >     private static String subject = "WORKFLOW";
> >     private static Session session;
> >     private MessageConsumer consumer;
> >     private static Connection connection;
> >     private static EntityXMLConsumer instance = null;
> >
> > private EntityXMLConsumer() throws JMSException{
> > logger.info("Initializing message listener on url :" + url);
> > ActiveMQConnectionFactory connectionFactory = new
> > ActiveMQConnectionFactory(url);
> >                 connection = connectionFactory.createConnection();
> >                // to use transactions you should set the first parameter
> to
> > 'true'
> >                session = connection.createSession(false,
> > Session.AUTO_ACKNOWLEDGE);
> >                Destination destination = session.createQueue(subject);
> >                consumer = session.createConsumer(destination);
> >
> >         consumer.setMessageListener(this);
> >             connection.start();
> > }
> >  public static EntityXMLConsumer startInstance() throws JMSException {
> >  if (instance == null){
> > instance = new EntityXMLConsumer();
> > }
> >  return instance;
> > }
> >  public static void closeConnection() {
> >         try {
> >          logger.info("closing connection");
> > connection.close();
> > } catch (JMSException e) {
> > logger.error(e);
> > }
> > }
> >
> >
> > public void onException(JMSException ex) {
> > logger.error(ex);
> > closeConnection();
> > }
> >
> >
> > public void onMessage(Message message) {
> >         final String MSG_ID = "ENTITY_DATA";
> >         PopWorkflowServiceSEI service = new PopWorkflowServiceImpl();
> >
> >     try {
> > logger.info("message received: " + message );
> > if (message instanceof MapMessage){
> > MapMessage mapMessage = (MapMessage) message;
> > Map payloadMap = (Map)mapMessage.getObject(MSG_ID);
> > for (Object messageKey: payloadMap.keySet()){
> > logger.info("Module ID: " + messageKey);
> > Object[] entityXML = ((List)(payloadMap.get(messageKey))).toArray();
> > String[] strEntityXML = Arrays.copyOf(entityXML, entityXML.length,
> > String[].class);
> > service.initWorkflow(strEntityXML , (String)messageKey);
> > logger.debug("Entity XML " + payloadMap.get(messageKey));
> > }
> > }
> >      } catch (JMSException e) {
> >      logger.error(e);
> >      closeConnection();
> > }
> > }
> > }
> >
> >
> > So it looks like either the listener or the broker stop responding and
> > messages get stuck in the queue. Are there any additional parameters I
> need
> > to configure? Any pointers appreciated.
> >
> >
> > Thanks for your help.
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message