activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kai Hackemesser <kai.hackemes...@gmail.com>
Subject ActiveMQ swallows messages in our environment, I need some suggestions
Date Mon, 12 Mar 2012 21:57:03 GMT
Hello,

- Given a network of V5.5.1 brokers named "brokerA" and "brokerB", and
- application "serviceA" is connected to "brokerA" and subscribed to a
"request" topic and ready to produce on a "response" topic, and
- client application "clientB" is connected to "brokerB", and
- named client is first creating a temporary consumer subscription against
the "response" topic on "brokerB", then producing a request message for the
"request" topic, and
- "serviceA" received the request message, produced the response message in
the response topic on "brokerA",
- then the message is not always received by the temporary consumer on
broker B (receive times out).

This happens often enough to make it an annoyance in our production
environment. We have proven that the service responds, but can't understand
why the subscriber starves. I currently have the theory that when serviceA
is responding on brokerA, brokerA is not yet aware of the new
consumer/subscriber on the response topic (maybe the network of brokers
advisory channels haven't been processed fast enough) and without
subscribers the message goes to Nirvana. Can this theoretically be the
case? We use Spring JmsTemplate on both ends of the topics and the
following class construction to do the synchronous request/response:

import java.io.Serializable;
import java.util.UUID;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.JmsUtils;
import org.springframework.stereotype.Component;

/*
 * based on
http://codedependents.com/2010/03/04/synchronous-request-response-with-activemq-and-spring/
 */
@Component
public class SynchronousJmsRequestor {

    private static final int TIMEOUT = 5000;
    private final Destination requestDestination;
    private final Destination responseDestination;
    private long timeout = TIMEOUT;

    private final String clientUid;

    @Resource
    private JmsTemplate jmsTemplate;

    public SynchronousJmsRequestor(

            final Destination requestDestination,
            final Destination responseDestination,
            final String clientUid) {

        this.responseDestination = responseDestination;
        this.requestDestination = requestDestination;
        this.clientUid = clientUid;
    }



    public Serializable request(final String serverUid, final Serializable
payload) throws JMSException {

        JmsDelegate jmsDelegate = new JmsDelegate(payload,
requestDestination, responseDestination, serverUid, clientUid,
            timeout);

        ObjectMessage response = (ObjectMessage)
jmsTemplate.execute(jmsDelegate, true);

        return response == null ? null : response.getObject();
    }


    public long getTimeout() {
        return timeout;
    }


    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }
}

class JmsDelegate implements SessionCallback<Message> {

    private Serializable payload;
    private final Destination requestDestination;
    private final Destination responseDestination;
    private String serverUid;
    private final String clientUid;
    private long timeout;

    public JmsDelegate(Serializable payload, Destination
requestDestination, Destination responseDestination,
                       String serverUid, String clientUid, long timeout) {
        super();
        this.payload = payload;
        this.requestDestination = requestDestination;
        this.responseDestination = responseDestination;
        this.serverUid = serverUid;
        this.clientUid = clientUid;
        this.timeout = timeout;
    }

    @Override
    public Message doInJms(final Session session) throws JMSException {
        MessageConsumer consumer = null;
        MessageProducer producer = null;

        try {
            final String correlationId = UUID.randomUUID().toString();

           * consumer = session.createConsumer(responseDestination,
"JMSCorrelationID = '" + correlationId + "'");*

            final ObjectMessage message =
session.createObjectMessage(payload);
            message.setJMSCorrelationID(correlationId);
            message.setStringProperty("CLIENT_ID", clientUid);
            message.setStringProperty("SERVER_ID", serverUid);
            producer = session.createProducer(requestDestination);
            *producer.send(message);*

            *return consumer.receive(timeout);*
        } finally {
            // Don't forget to close your resources
            JmsUtils.closeMessageConsumer(consumer);

            JmsUtils.closeMessageProducer(producer);
        }
    }
}

Cheers,
Kai

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message