activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Tully <gary.tu...@gmail.com>
Subject Re: ActiveMQ swallows messages in our environment, I need some suggestions
Date Mon, 12 Mar 2012 23:21:42 GMT
one other thought, assuming non duplex, if you set dispatchAsync=false
for your network connector, advisories should always reach the other
broker before messages.

On 12 March 2012 22:21, Gary Tully <gary.tully@gmail.com> wrote:
> I think that is a reasonable theory, depending on load, the order or
> reaction to advisories may not be totally reliable.
>
> One way to validate your theory is to enable the
> sendAdvisoryIfNoConsumers option and subscribe to that advisory to see
> if your messages end up there. The message will be the payload of the
> advisory
> see: http://activemq.apache.org/advisory-message.html
>
> Is your network bridge duplex? I came across an issue[1] where the
> symptom was dropped advisories but it was a problem with unacked
> messages in the duplex case. The workaround is to use two network
> connectors in place of duplex.
> [1] https://issues.apache.org/jira/browse/AMQ-3694
>
>
> On 12 March 2012 21:57, Kai Hackemesser <kai.hackemesser@gmail.com> wrote:
>> Hello,
>>
>> - Given a network of V5.5.1 brokers named "brokerA" and "brokerB", and
>> - application "serviceA" is connected to "brokerA" and subscribed to a
>> "request" topic and ready to produce on a "response" topic, and
>> - client application "clientB" is connected to "brokerB", and
>> - named client is first creating a temporary consumer subscription against
>> the "response" topic on "brokerB", then producing a request message for the
>> "request" topic, and
>> - "serviceA" received the request message, produced the response message in
>> the response topic on "brokerA",
>> - then the message is not always received by the temporary consumer on
>> broker B (receive times out).
>>
>> This happens often enough to make it an annoyance in our production
>> environment. We have proven that the service responds, but can't understand
>> why the subscriber starves. I currently have the theory that when serviceA
>> is responding on brokerA, brokerA is not yet aware of the new
>> consumer/subscriber on the response topic (maybe the network of brokers
>> advisory channels haven't been processed fast enough) and without
>> subscribers the message goes to Nirvana. Can this theoretically be the
>> case? We use Spring JmsTemplate on both ends of the topics and the
>> following class construction to do the synchronous request/response:
>>
>> import java.io.Serializable;
>> import java.util.UUID;
>> import javax.annotation.Resource;
>> import javax.jms.Destination;
>> import javax.jms.JMSException;
>> import javax.jms.Message;
>> import javax.jms.MessageConsumer;
>> import javax.jms.MessageProducer;
>> import javax.jms.ObjectMessage;
>> import javax.jms.Session;
>> import org.springframework.jms.core.JmsTemplate;
>> import org.springframework.jms.core.SessionCallback;
>> import org.springframework.jms.support.JmsUtils;
>> import org.springframework.stereotype.Component;
>>
>> /*
>>  * based on
>> http://codedependents.com/2010/03/04/synchronous-request-response-with-activemq-and-spring/
>>  */
>> @Component
>> public class SynchronousJmsRequestor {
>>
>>    private static final int TIMEOUT = 5000;
>>    private final Destination requestDestination;
>>    private final Destination responseDestination;
>>    private long timeout = TIMEOUT;
>>
>>    private final String clientUid;
>>
>>    @Resource
>>    private JmsTemplate jmsTemplate;
>>
>>    public SynchronousJmsRequestor(
>>
>>            final Destination requestDestination,
>>            final Destination responseDestination,
>>            final String clientUid) {
>>
>>        this.responseDestination = responseDestination;
>>        this.requestDestination = requestDestination;
>>        this.clientUid = clientUid;
>>    }
>>
>>
>>
>>    public Serializable request(final String serverUid, final Serializable
>> payload) throws JMSException {
>>
>>        JmsDelegate jmsDelegate = new JmsDelegate(payload,
>> requestDestination, responseDestination, serverUid, clientUid,
>>            timeout);
>>
>>        ObjectMessage response = (ObjectMessage)
>> jmsTemplate.execute(jmsDelegate, true);
>>
>>        return response == null ? null : response.getObject();
>>    }
>>
>>
>>    public long getTimeout() {
>>        return timeout;
>>    }
>>
>>
>>    public void setTimeout(long timeout) {
>>        this.timeout = timeout;
>>    }
>> }
>>
>> class JmsDelegate implements SessionCallback<Message> {
>>
>>    private Serializable payload;
>>    private final Destination requestDestination;
>>    private final Destination responseDestination;
>>    private String serverUid;
>>    private final String clientUid;
>>    private long timeout;
>>
>>    public JmsDelegate(Serializable payload, Destination
>> requestDestination, Destination responseDestination,
>>                       String serverUid, String clientUid, long timeout)
{
>>        super();
>>        this.payload = payload;
>>        this.requestDestination = requestDestination;
>>        this.responseDestination = responseDestination;
>>        this.serverUid = serverUid;
>>        this.clientUid = clientUid;
>>        this.timeout = timeout;
>>    }
>>
>>    @Override
>>    public Message doInJms(final Session session) throws JMSException {
>>        MessageConsumer consumer = null;
>>        MessageProducer producer = null;
>>
>>        try {
>>            final String correlationId = UUID.randomUUID().toString();
>>
>>           * consumer = session.createConsumer(responseDestination,
>> "JMSCorrelationID = '" + correlationId + "'");*
>>
>>            final ObjectMessage message =
>> session.createObjectMessage(payload);
>>            message.setJMSCorrelationID(correlationId);
>>            message.setStringProperty("CLIENT_ID", clientUid);
>>            message.setStringProperty("SERVER_ID", serverUid);
>>            producer = session.createProducer(requestDestination);
>>            *producer.send(message);*
>>
>>            *return consumer.receive(timeout);*
>>        } finally {
>>            // Don't forget to close your resources
>>            JmsUtils.closeMessageConsumer(consumer);
>>
>>            JmsUtils.closeMessageProducer(producer);
>>        }
>>    }
>> }
>>
>> Cheers,
>> Kai
>
>
>
> --
> http://fusesource.com
> http://blog.garytully.com



-- 
http://fusesource.com
http://blog.garytully.com

Mime
View raw message