activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Bain <tb...@alumni.duke.edu>
Subject Re: Consumer not able to consumer messages from queue
Date Tue, 13 Jan 2015 14:22:22 GMT
I can't speak to the question about closing the session (I use Camel to
interact with ActiveMQ, so I don't directly deal with the session), but
even if that explains the increasing msgsHeld value, it doesn't explain the
producer slowing down and eventually stopping its sends.  So there might be
multiple problems here.

What code calls your publish() method periodically?  And can you find out
(e.g. by taking the thread dump Art referenced) whether you're blocked on
the send() call or somewhere else when it all comes to a halt?

Also, I was under the impression that it wasn't possible to set message TTL
on the broker and that it could only be set by the producer; can you
provide a reference to the documentation for the setting you're using?
On Jan 13, 2015 1:03 AM, "xabhi" <xabhi91@gmail.com> wrote:

> Thanks for the reply.
>
> I have disabled producer flow control on both topics and queues in my
> broker
> configuration and I have message TTL specified on broker side.
>
> The destination on which heartbeats are sent is a Queue on which 5
> concurrent consumers are listening and the reply to destination is a topic
> on which there are multiple subscribers.
>
> When all this went down the topic destination enqueue count was not
> increasing.
>
> I am pasting the code for consumer and producer:
>
> *Producer:*
>
> public boolean publish()
> {
>     String message = "Heartbeat message";
>     boolean responseReceived = false;
>
>     Connection connection = null;
>     Session session = null;
>
>     try
>     {
>       connection = myJmsTemplate.getConnectionFactory().createConnection();
>       session    = connection.createSession(transacted, ackMode);
>
>       String correlationId = null;
>       Long   timeStamp     = System.currentTimeMillis();
>       Random random        = new Random(timeStamp);
>
>       Integer randomPart = random.nextInt(Integer.MAX_VALUE);
>       Long    threadId   = Thread.currentThread().getId();
>       correlationId      = threadId + "_" + timeStamp + "_" + randomPart;
>
>       String messageSelector = "JMSCorrelationID='" + correlationId + "'";
>       MessageConsumer responseConsumer =
> session.createConsumer(receiveDestination, messageSelector);
>       connection.start();
>
>       // send a text message to broker
>       myJmsTemplate.send(sendDestination, new
> SimpleTextMessageCreator(message, receiveDestination, correlationId));
>
>       LOG.debug("Waiting for message with " + messageSelector + " for " +
> DEFAULT_TIMEOUT + " ms");
>
>       // check for response from broker, DEFAULT_TIMEOUT is 60s.
>      TextMessage responseMessage = (TextMessage)
> responseConsumer.receive(DEFAULT_TIMEOUT);
>      if (responseMessage != null)
>      {
>          if (!responseMessage.getJMSCorrelationID().equals(correlationId))
> {
>              String errorMsg =
>                  "Invalid correlation id in response message!!! " +
>                  "Expected : " + correlationId +
>                  " but received : " +
> responseMessage.getJMSCorrelationID();
>
>              LOG.error(errorMsg);
>              responseReceived = false;
>          }
>          else {
>              LOG.debug("Recieved the response back: " +
> responseMessage.getText());
>              LOG.debug("Correlation id of response message : " +
> responseMessage.getJMSCorrelationID());
>              responseReceived = true;
>          }
>      }
>     }
>     catch (JMSException e)
>     {
>      LOG.error("Error interacting with broker", e);
>     }
>     catch (Throwable t) {
>      LOG.warn("Publish encountered unknown exception.", t);
>     }
>     finally {
>      JmsUtil.closeConnection(connection, session,
> this.getClass().getName());
>     }
>     return responseReceived;
> }
>
>
> *Listener/Consumer:*
>
> public class HeartBeatListener implements SessionAwareMessageListener
> {
>
>      private final Log LOG = LogFactory.getLog(this.getClass());
>
>      @Override
>      public void onMessage(Message message, Session session) throws
> JMSException
>      {
>          if (!(message instanceof TextMessage)) {
>              throw new IllegalArgumentException("Message must be of type
> TextMessage: " + message);
>          }
>
>          String replyTextMessage = "Heartbeat Ack.";
>
>          try
>          {
>              TextMessage textMessage = (TextMessage) message;
>              String msg = textMessage.getText();
>
>              LOG.debug("Received heart beat message : " + msg);
>
>              // Send the response to the destination specified by the
>              // 'JMSReplyTo' field of the received message.
>              Destination responseDest = message.getJMSReplyTo();
>              if (responseDest != null)
>              {
>                  LOG.debug("Sending response to destination" +
> responseDest.toString());
>
>                  // Setup a message producer for the above destination
>                  MessageProducer producer =
> session.createProducer(responseDest);
>                  TextMessage responseMessage =
> session.createTextMessage(replyTextMessage);
>
>
> responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
>
>                  // Send the response back
>                  producer.send(responseMessage);
>                  LOG.debug("Heart Beat Response Sent: " + responseMessage);
>              }
>          }
>          catch (JMSException e)
>          {
>              LOG.error("Error while processing the message " + message, e);
>          }
>      }
> }
> This listener is used in a DMLC.
>
> Hi art,
> Here the session is created on the producer side (in the publish()
> function)
> and on listener side a session aware listener is used. My question was that
> what if the session on publisher side is closed before the session aware
> listener is even called or is in process of executing onMessage()?
>
> Please tell me if I am doing anything wrong here?
>
> Thanks for all the help.
> -Abhi
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689835.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message