activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raul Kripalani <r...@evosent.com>
Subject Re: Embedded Broker/Listener becomes inactive and doesn't dequeue messages
Date Mon, 24 Dec 2012 17:03:36 GMT
You are setting a tempUsage limit of 2 kilobytes. Maybe you expected 2000
to mean megabytes, but the unit is bytes. See Javadoc [1].

In a non-persistent broker, the role of tempUsage is to buffer up
non-persistent messages when consumers can't keep up. The temp storage
implementation (PListStore) uses a default journal size of 32mb. So your
system locks up as soon as it lazy inits the PListStore for the first time,
because tempUsage < default journal file size.

My two cents. Please let us know if this helped.

Thanks,

[1]
http://activemq.apache.org/maven/5.7.0/activemq-core/apidocs/org/apache/activemq/usage/Usage.html#setLimit(long)
.

*Raúl Kripalani*
Apache Camel Committer
Enterprise Architect, Program Manager, Open Source Integration specialist
http://about.me/raulkripalani | http://www.linkedin.com/in/raulkripalani
http://blog.raulkr.net | twitter: @raulvk <http://twitter.com/raulvk>

On Mon, Dec 24, 2012 at 4:50 PM, Jose Martinez <jmartinez@opencrowd.com>wrote:

> Hello,
>
> I've implemented an embedded activemq service and a message listener using
> in activemq 5.7.0. It works fine for a few hours but I've noticed that
> after a day or two of inactivity messages stop getting consumed and I keep
> getting the following message in the logs:
>
> 16:19:01,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> elapsed since last write check.
> 16:19:01,627 DEBUG [AbstractInactivityMonitor] Running
> WriteCheck[tcp://my.ip.addr:61616]
> 16:19:11,624 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> elapsed since last write check.
> 16:19:11,625 DEBUG [AbstractInactivityMonitor] Running
> WriteCheck[tcp://my.ip.addr:61616]
> 16:19:11,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> elapsed since last write check.
> 16:19:11,627 DEBUG [AbstractInactivityMonitor] Running
> WriteCheck[tcp://my.ip.addr:61616]
>
>
> It looks like my connection becomes inactive after awhile and messages get
> stuck in the queue. I can see in netstat the several ports
> remain occupied but messages are not dequeued.
>
> Here's what my embedded service looks like (implemented as a servlet in
> tomcat):
>
> public void init(ServletConfig config) throws ServletException {
> super.init(config);
>  logger.info("Initializing message queue listener");
>  try {
> BrokerService broker = new BrokerService();
>
> TransportConnector connector = new TransportConnector();
> connector.setUri(new URI("tcp://localhost:61616"));
> broker.addConnector(connector);
> broker.setPersistent(false);
> broker.getSystemUsage().getTempUsage().setLimit(2000);
> broker.start();
> EntityXMLConsumer.startInstance(); /this is the listener
> } catch (Exception e) {
> logger.error(e);
> }
> }
>
>
> And here's my listener.
>
> public class EntityXMLConsumer implements MessageListener,
> ExceptionListener{
>  private static Logger logger = Logger.getLogger(EntityXMLConsumer.class);
>  private static ConfigManager configManager = ConfigManager.getInstance();
> private static String url =
> configManager.getProperty("popworkflowsvc.activemq.url");
> //private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
>     private static String subject = "WORKFLOW";
>     private static Session session;
>     private MessageConsumer consumer;
>     private static Connection connection;
>     private static EntityXMLConsumer instance = null;
>
> private EntityXMLConsumer() throws JMSException{
> logger.info("Initializing message listener on url :" + url);
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(url);
>                 connection = connectionFactory.createConnection();
>                // to use transactions you should set the first parameter to
> 'true'
>                session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>                Destination destination = session.createQueue(subject);
>                consumer = session.createConsumer(destination);
>
>         consumer.setMessageListener(this);
>             connection.start();
> }
>  public static EntityXMLConsumer startInstance() throws JMSException {
>  if (instance == null){
> instance = new EntityXMLConsumer();
> }
>  return instance;
> }
>  public static void closeConnection() {
>         try {
>          logger.info("closing connection");
> connection.close();
> } catch (JMSException e) {
> logger.error(e);
> }
> }
>
>
> public void onException(JMSException ex) {
> logger.error(ex);
> closeConnection();
> }
>
>
> public void onMessage(Message message) {
>         final String MSG_ID = "ENTITY_DATA";
>         PopWorkflowServiceSEI service = new PopWorkflowServiceImpl();
>
>     try {
> logger.info("message received: " + message );
> if (message instanceof MapMessage){
> MapMessage mapMessage = (MapMessage) message;
> Map payloadMap = (Map)mapMessage.getObject(MSG_ID);
> for (Object messageKey: payloadMap.keySet()){
> logger.info("Module ID: " + messageKey);
> Object[] entityXML = ((List)(payloadMap.get(messageKey))).toArray();
> String[] strEntityXML = Arrays.copyOf(entityXML, entityXML.length,
> String[].class);
> service.initWorkflow(strEntityXML , (String)messageKey);
> logger.debug("Entity XML " + payloadMap.get(messageKey));
> }
> }
>      } catch (JMSException e) {
>      logger.error(e);
>      closeConnection();
> }
> }
> }
>
>
> So it looks like either the listener or the broker stop responding and
> messages get stuck in the queue. Are there any additional parameters I need
> to configure? Any pointers appreciated.
>
>
> Thanks for your help.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message