activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jose Martinez <jmarti...@opencrowd.com>
Subject Re: Embedded Broker/Listener becomes inactive and doesn't dequeue messages
Date Tue, 25 Dec 2012 19:33:25 GMT
Seems like i keep having the issue even after increasing the limit. I
wonder what does the 'WriteChecker" message really means. And why can't the
connection remain stable? Is there a way to automatically recycle the
connection to keep it "fresh"?

Any suggestions appreciated.


On Mon, Dec 24, 2012 at 1:01 PM, Jose Martinez <jmartinez@opencrowd.com>wrote:

> Thanks!
>
> I'll set it as follows:
>     broker.getSystemUsage().getTempUsage().setLimit(1024l*64l);
>
> And see how it goes.
>
>
>
> On Mon, Dec 24, 2012 at 12:03 PM, Raul Kripalani <raul@evosent.com> wrote:
>
>> You are setting a tempUsage limit of 2 kilobytes. Maybe you expected 2000
>> to mean megabytes, but the unit is bytes. See Javadoc [1].
>>
>> In a non-persistent broker, the role of tempUsage is to buffer up
>> non-persistent messages when consumers can't keep up. The temp storage
>> implementation (PListStore) uses a default journal size of 32mb. So your
>> system locks up as soon as it lazy inits the PListStore for the first
>> time,
>> because tempUsage < default journal file size.
>>
>> My two cents. Please let us know if this helped.
>>
>> Thanks,
>>
>> [1]
>>
>> http://activemq.apache.org/maven/5.7.0/activemq-core/apidocs/org/apache/activemq/usage/Usage.html#setLimit(long)
>> .
>>
>> *Raúl Kripalani*
>> Apache Camel Committer
>> Enterprise Architect, Program Manager, Open Source Integration specialist
>> http://about.me/raulkripalani | http://www.linkedin.com/in/raulkripalani
>> http://blog.raulkr.net | twitter: @raulvk <http://twitter.com/raulvk>
>>
>> On Mon, Dec 24, 2012 at 4:50 PM, Jose Martinez <jmartinez@opencrowd.com
>> >wrote:
>>
>> > Hello,
>> >
>> > I've implemented an embedded activemq service and a message listener
>> using
>> > in activemq 5.7.0. It works fine for a few hours but I've noticed that
>> > after a day or two of inactivity messages stop getting consumed and I
>> keep
>> > getting the following message in the logs:
>> >
>> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
>> > elapsed since last write check.
>> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] Running
>> > WriteCheck[tcp://my.ip.addr:61616]
>> > 16:19:11,624 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
>> > elapsed since last write check.
>> > 16:19:11,625 DEBUG [AbstractInactivityMonitor] Running
>> > WriteCheck[tcp://my.ip.addr:61616]
>> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
>> > elapsed since last write check.
>> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] Running
>> > WriteCheck[tcp://my.ip.addr:61616]
>> >
>> >
>> > It looks like my connection becomes inactive after awhile and messages
>> get
>> > stuck in the queue. I can see in netstat the several ports
>> > remain occupied but messages are not dequeued.
>> >
>> > Here's what my embedded service looks like (implemented as a servlet in
>> > tomcat):
>> >
>> > public void init(ServletConfig config) throws ServletException {
>> > super.init(config);
>> >  logger.info("Initializing message queue listener");
>> >  try {
>> > BrokerService broker = new BrokerService();
>> >
>> > TransportConnector connector = new TransportConnector();
>> > connector.setUri(new URI("tcp://localhost:61616"));
>> > broker.addConnector(connector);
>> > broker.setPersistent(false);
>> > broker.getSystemUsage().getTempUsage().setLimit(2000);
>> > broker.start();
>> > EntityXMLConsumer.startInstance(); /this is the listener
>> > } catch (Exception e) {
>> > logger.error(e);
>> > }
>> > }
>> >
>> >
>> > And here's my listener.
>> >
>> > public class EntityXMLConsumer implements MessageListener,
>> > ExceptionListener{
>> >  private static Logger logger =
>> Logger.getLogger(EntityXMLConsumer.class);
>> >  private static ConfigManager configManager =
>> ConfigManager.getInstance();
>> > private static String url =
>> > configManager.getProperty("popworkflowsvc.activemq.url");
>> > //private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
>> >     private static String subject = "WORKFLOW";
>> >     private static Session session;
>> >     private MessageConsumer consumer;
>> >     private static Connection connection;
>> >     private static EntityXMLConsumer instance = null;
>> >
>> > private EntityXMLConsumer() throws JMSException{
>> > logger.info("Initializing message listener on url :" + url);
>> > ActiveMQConnectionFactory connectionFactory = new
>> > ActiveMQConnectionFactory(url);
>> >                 connection = connectionFactory.createConnection();
>> >                // to use transactions you should set the first
>> parameter to
>> > 'true'
>> >                session = connection.createSession(false,
>> > Session.AUTO_ACKNOWLEDGE);
>> >                Destination destination = session.createQueue(subject);
>> >                consumer = session.createConsumer(destination);
>> >
>> >         consumer.setMessageListener(this);
>> >             connection.start();
>> > }
>> >  public static EntityXMLConsumer startInstance() throws JMSException {
>> >  if (instance == null){
>> > instance = new EntityXMLConsumer();
>> > }
>> >  return instance;
>> > }
>> >  public static void closeConnection() {
>> >         try {
>> >          logger.info("closing connection");
>> > connection.close();
>> > } catch (JMSException e) {
>> > logger.error(e);
>> > }
>> > }
>> >
>> >
>> > public void onException(JMSException ex) {
>> > logger.error(ex);
>> > closeConnection();
>> > }
>> >
>> >
>> > public void onMessage(Message message) {
>> >         final String MSG_ID = "ENTITY_DATA";
>> >         PopWorkflowServiceSEI service = new PopWorkflowServiceImpl();
>> >
>> >     try {
>> > logger.info("message received: " + message );
>> > if (message instanceof MapMessage){
>> > MapMessage mapMessage = (MapMessage) message;
>> > Map payloadMap = (Map)mapMessage.getObject(MSG_ID);
>> > for (Object messageKey: payloadMap.keySet()){
>> > logger.info("Module ID: " + messageKey);
>> > Object[] entityXML = ((List)(payloadMap.get(messageKey))).toArray();
>> > String[] strEntityXML = Arrays.copyOf(entityXML, entityXML.length,
>> > String[].class);
>> > service.initWorkflow(strEntityXML , (String)messageKey);
>> > logger.debug("Entity XML " + payloadMap.get(messageKey));
>> > }
>> > }
>> >      } catch (JMSException e) {
>> >      logger.error(e);
>> >      closeConnection();
>> > }
>> > }
>> > }
>> >
>> >
>> > So it looks like either the listener or the broker stop responding and
>> > messages get stuck in the queue. Are there any additional parameters I
>> need
>> > to configure? Any pointers appreciated.
>> >
>> >
>> > Thanks for your help.
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message