activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jose Martinez <jmarti...@opencrowd.com>
Subject Embedded Broker/Listener becomes inactive and doesn't dequeue messages
Date Mon, 24 Dec 2012 16:50:24 GMT
Hello,

I've implemented an embedded activemq service and a message listener using
in activemq 5.7.0. It works fine for a few hours but I've noticed that
after a day or two of inactivity messages stop getting consumed and I keep
getting the following message in the logs:

16:19:01,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
elapsed since last write check.
16:19:01,627 DEBUG [AbstractInactivityMonitor] Running
WriteCheck[tcp://my.ip.addr:61616]
16:19:11,624 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
elapsed since last write check.
16:19:11,625 DEBUG [AbstractInactivityMonitor] Running
WriteCheck[tcp://my.ip.addr:61616]
16:19:11,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
elapsed since last write check.
16:19:11,627 DEBUG [AbstractInactivityMonitor] Running
WriteCheck[tcp://my.ip.addr:61616]


It looks like my connection becomes inactive after awhile and messages get
stuck in the queue. I can see in netstat the several ports
remain occupied but messages are not dequeued.

Here's what my embedded service looks like (implemented as a servlet in
tomcat):

public void init(ServletConfig config) throws ServletException {
super.init(config);
 logger.info("Initializing message queue listener");
 try {
BrokerService broker = new BrokerService();

TransportConnector connector = new TransportConnector();
connector.setUri(new URI("tcp://localhost:61616"));
broker.addConnector(connector);
broker.setPersistent(false);
broker.getSystemUsage().getTempUsage().setLimit(2000);
broker.start();
EntityXMLConsumer.startInstance(); /this is the listener
} catch (Exception e) {
logger.error(e);
}
}


And here's my listener.

public class EntityXMLConsumer implements MessageListener,
ExceptionListener{
 private static Logger logger = Logger.getLogger(EntityXMLConsumer.class);
 private static ConfigManager configManager = ConfigManager.getInstance();
private static String url =
configManager.getProperty("popworkflowsvc.activemq.url");
//private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static String subject = "WORKFLOW";
    private static Session session;
    private MessageConsumer consumer;
    private static Connection connection;
    private static EntityXMLConsumer instance = null;

private EntityXMLConsumer() throws JMSException{
logger.info("Initializing message listener on url :" + url);
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);
                connection = connectionFactory.createConnection();
               // to use transactions you should set the first parameter to
'true'
               session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
               Destination destination = session.createQueue(subject);
               consumer = session.createConsumer(destination);

        consumer.setMessageListener(this);
            connection.start();
}
 public static EntityXMLConsumer startInstance() throws JMSException {
 if (instance == null){
instance = new EntityXMLConsumer();
}
 return instance;
}
 public static void closeConnection() {
        try {
         logger.info("closing connection");
connection.close();
} catch (JMSException e) {
logger.error(e);
}
}


public void onException(JMSException ex) {
logger.error(ex);
closeConnection();
}


public void onMessage(Message message) {
        final String MSG_ID = "ENTITY_DATA";
        PopWorkflowServiceSEI service = new PopWorkflowServiceImpl();

    try {
logger.info("message received: " + message );
if (message instanceof MapMessage){
MapMessage mapMessage = (MapMessage) message;
Map payloadMap = (Map)mapMessage.getObject(MSG_ID);
for (Object messageKey: payloadMap.keySet()){
logger.info("Module ID: " + messageKey);
Object[] entityXML = ((List)(payloadMap.get(messageKey))).toArray();
String[] strEntityXML = Arrays.copyOf(entityXML, entityXML.length,
String[].class);
service.initWorkflow(strEntityXML , (String)messageKey);
logger.debug("Entity XML " + payloadMap.get(messageKey));
}
}
     } catch (JMSException e) {
     logger.error(e);
     closeConnection();
}
}
}


So it looks like either the listener or the broker stop responding and
messages get stuck in the queue. Are there any additional parameters I need
to configure? Any pointers appreciated.


Thanks for your help.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message