activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Tully <gary.tu...@gmail.com>
Subject Re: ActiveMQ swallows messages in our environment, I need some suggestions
Date Mon, 12 Mar 2012 22:21:21 GMT
I think that is a reasonable theory, depending on load, the order or
reaction to advisories may not be totally reliable.

One way to validate your theory is to enable the
sendAdvisoryIfNoConsumers option and subscribe to that advisory to see
if your messages end up there. The message will be the payload of the
advisory
see: http://activemq.apache.org/advisory-message.html

Is your network bridge duplex? I came across an issue[1] where the
symptom was dropped advisories but it was a problem with unacked
messages in the duplex case. The workaround is to use two network
connectors in place of duplex.
[1] https://issues.apache.org/jira/browse/AMQ-3694


On 12 March 2012 21:57, Kai Hackemesser <kai.hackemesser@gmail.com> wrote:
> Hello,
>
> - Given a network of V5.5.1 brokers named "brokerA" and "brokerB", and
> - application "serviceA" is connected to "brokerA" and subscribed to a
> "request" topic and ready to produce on a "response" topic, and
> - client application "clientB" is connected to "brokerB", and
> - named client is first creating a temporary consumer subscription against
> the "response" topic on "brokerB", then producing a request message for the
> "request" topic, and
> - "serviceA" received the request message, produced the response message in
> the response topic on "brokerA",
> - then the message is not always received by the temporary consumer on
> broker B (receive times out).
>
> This happens often enough to make it an annoyance in our production
> environment. We have proven that the service responds, but can't understand
> why the subscriber starves. I currently have the theory that when serviceA
> is responding on brokerA, brokerA is not yet aware of the new
> consumer/subscriber on the response topic (maybe the network of brokers
> advisory channels haven't been processed fast enough) and without
> subscribers the message goes to Nirvana. Can this theoretically be the
> case? We use Spring JmsTemplate on both ends of the topics and the
> following class construction to do the synchronous request/response:
>
> import java.io.Serializable;
> import java.util.UUID;
> import javax.annotation.Resource;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageProducer;
> import javax.jms.ObjectMessage;
> import javax.jms.Session;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.core.SessionCallback;
> import org.springframework.jms.support.JmsUtils;
> import org.springframework.stereotype.Component;
>
> /*
>  * based on
> http://codedependents.com/2010/03/04/synchronous-request-response-with-activemq-and-spring/
>  */
> @Component
> public class SynchronousJmsRequestor {
>
>    private static final int TIMEOUT = 5000;
>    private final Destination requestDestination;
>    private final Destination responseDestination;
>    private long timeout = TIMEOUT;
>
>    private final String clientUid;
>
>    @Resource
>    private JmsTemplate jmsTemplate;
>
>    public SynchronousJmsRequestor(
>
>            final Destination requestDestination,
>            final Destination responseDestination,
>            final String clientUid) {
>
>        this.responseDestination = responseDestination;
>        this.requestDestination = requestDestination;
>        this.clientUid = clientUid;
>    }
>
>
>
>    public Serializable request(final String serverUid, final Serializable
> payload) throws JMSException {
>
>        JmsDelegate jmsDelegate = new JmsDelegate(payload,
> requestDestination, responseDestination, serverUid, clientUid,
>            timeout);
>
>        ObjectMessage response = (ObjectMessage)
> jmsTemplate.execute(jmsDelegate, true);
>
>        return response == null ? null : response.getObject();
>    }
>
>
>    public long getTimeout() {
>        return timeout;
>    }
>
>
>    public void setTimeout(long timeout) {
>        this.timeout = timeout;
>    }
> }
>
> class JmsDelegate implements SessionCallback<Message> {
>
>    private Serializable payload;
>    private final Destination requestDestination;
>    private final Destination responseDestination;
>    private String serverUid;
>    private final String clientUid;
>    private long timeout;
>
>    public JmsDelegate(Serializable payload, Destination
> requestDestination, Destination responseDestination,
>                       String serverUid, String clientUid, long timeout) {
>        super();
>        this.payload = payload;
>        this.requestDestination = requestDestination;
>        this.responseDestination = responseDestination;
>        this.serverUid = serverUid;
>        this.clientUid = clientUid;
>        this.timeout = timeout;
>    }
>
>    @Override
>    public Message doInJms(final Session session) throws JMSException {
>        MessageConsumer consumer = null;
>        MessageProducer producer = null;
>
>        try {
>            final String correlationId = UUID.randomUUID().toString();
>
>           * consumer = session.createConsumer(responseDestination,
> "JMSCorrelationID = '" + correlationId + "'");*
>
>            final ObjectMessage message =
> session.createObjectMessage(payload);
>            message.setJMSCorrelationID(correlationId);
>            message.setStringProperty("CLIENT_ID", clientUid);
>            message.setStringProperty("SERVER_ID", serverUid);
>            producer = session.createProducer(requestDestination);
>            *producer.send(message);*
>
>            *return consumer.receive(timeout);*
>        } finally {
>            // Don't forget to close your resources
>            JmsUtils.closeMessageConsumer(consumer);
>
>            JmsUtils.closeMessageProducer(producer);
>        }
>    }
> }
>
> Cheers,
> Kai



-- 
http://fusesource.com
http://blog.garytully.com

Mime
View raw message