activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kai Hackemesser <kai.hackemes...@gmail.com>
Subject Re: ActiveMQ swallows messages in our environment, I need some suggestions
Date Mon, 12 Mar 2012 23:46:11 GMT
Hi, Gary,

I just tried that. The network connector hasn't that property, just the
connection. we have set this on the connection factory in our client and
server sample application but it did not improve the issue.

Cheers,
Kai

2012/3/13 Gary Tully <gary.tully@gmail.com>

> one other thought, assuming non duplex, if you set dispatchAsync=false
> for your network connector, advisories should always reach the other
> broker before messages.
>
> On 12 March 2012 22:21, Gary Tully <gary.tully@gmail.com> wrote:
> > I think that is a reasonable theory, depending on load, the order or
> > reaction to advisories may not be totally reliable.
> >
> > One way to validate your theory is to enable the
> > sendAdvisoryIfNoConsumers option and subscribe to that advisory to see
> > if your messages end up there. The message will be the payload of the
> > advisory
> > see: http://activemq.apache.org/advisory-message.html
> >
> > Is your network bridge duplex? I came across an issue[1] where the
> > symptom was dropped advisories but it was a problem with unacked
> > messages in the duplex case. The workaround is to use two network
> > connectors in place of duplex.
> > [1] https://issues.apache.org/jira/browse/AMQ-3694
> >
> >
> > On 12 March 2012 21:57, Kai Hackemesser <kai.hackemesser@gmail.com>
> wrote:
> >> Hello,
> >>
> >> - Given a network of V5.5.1 brokers named "brokerA" and "brokerB", and
> >> - application "serviceA" is connected to "brokerA" and subscribed to a
> >> "request" topic and ready to produce on a "response" topic, and
> >> - client application "clientB" is connected to "brokerB", and
> >> - named client is first creating a temporary consumer subscription
> against
> >> the "response" topic on "brokerB", then producing a request message for
> the
> >> "request" topic, and
> >> - "serviceA" received the request message, produced the response
> message in
> >> the response topic on "brokerA",
> >> - then the message is not always received by the temporary consumer on
> >> broker B (receive times out).
> >>
> >> This happens often enough to make it an annoyance in our production
> >> environment. We have proven that the service responds, but can't
> understand
> >> why the subscriber starves. I currently have the theory that when
> serviceA
> >> is responding on brokerA, brokerA is not yet aware of the new
> >> consumer/subscriber on the response topic (maybe the network of brokers
> >> advisory channels haven't been processed fast enough) and without
> >> subscribers the message goes to Nirvana. Can this theoretically be the
> >> case? We use Spring JmsTemplate on both ends of the topics and the
> >> following class construction to do the synchronous request/response:
> >>
> >> import java.io.Serializable;
> >> import java.util.UUID;
> >> import javax.annotation.Resource;
> >> import javax.jms.Destination;
> >> import javax.jms.JMSException;
> >> import javax.jms.Message;
> >> import javax.jms.MessageConsumer;
> >> import javax.jms.MessageProducer;
> >> import javax.jms.ObjectMessage;
> >> import javax.jms.Session;
> >> import org.springframework.jms.core.JmsTemplate;
> >> import org.springframework.jms.core.SessionCallback;
> >> import org.springframework.jms.support.JmsUtils;
> >> import org.springframework.stereotype.Component;
> >>
> >> /*
> >>  * based on
> >>
> http://codedependents.com/2010/03/04/synchronous-request-response-with-activemq-and-spring/
> >>  */
> >> @Component
> >> public class SynchronousJmsRequestor {
> >>
> >>    private static final int TIMEOUT = 5000;
> >>    private final Destination requestDestination;
> >>    private final Destination responseDestination;
> >>    private long timeout = TIMEOUT;
> >>
> >>    private final String clientUid;
> >>
> >>    @Resource
> >>    private JmsTemplate jmsTemplate;
> >>
> >>    public SynchronousJmsRequestor(
> >>
> >>            final Destination requestDestination,
> >>            final Destination responseDestination,
> >>            final String clientUid) {
> >>
> >>        this.responseDestination = responseDestination;
> >>        this.requestDestination = requestDestination;
> >>        this.clientUid = clientUid;
> >>    }
> >>
> >>
> >>
> >>    public Serializable request(final String serverUid, final
> Serializable
> >> payload) throws JMSException {
> >>
> >>        JmsDelegate jmsDelegate = new JmsDelegate(payload,
> >> requestDestination, responseDestination, serverUid, clientUid,
> >>            timeout);
> >>
> >>        ObjectMessage response = (ObjectMessage)
> >> jmsTemplate.execute(jmsDelegate, true);
> >>
> >>        return response == null ? null : response.getObject();
> >>    }
> >>
> >>
> >>    public long getTimeout() {
> >>        return timeout;
> >>    }
> >>
> >>
> >>    public void setTimeout(long timeout) {
> >>        this.timeout = timeout;
> >>    }
> >> }
> >>
> >> class JmsDelegate implements SessionCallback<Message> {
> >>
> >>    private Serializable payload;
> >>    private final Destination requestDestination;
> >>    private final Destination responseDestination;
> >>    private String serverUid;
> >>    private final String clientUid;
> >>    private long timeout;
> >>
> >>    public JmsDelegate(Serializable payload, Destination
> >> requestDestination, Destination responseDestination,
> >>                       String serverUid, String clientUid, long timeout)
> {
> >>        super();
> >>        this.payload = payload;
> >>        this.requestDestination = requestDestination;
> >>        this.responseDestination = responseDestination;
> >>        this.serverUid = serverUid;
> >>        this.clientUid = clientUid;
> >>        this.timeout = timeout;
> >>    }
> >>
> >>    @Override
> >>    public Message doInJms(final Session session) throws JMSException {
> >>        MessageConsumer consumer = null;
> >>        MessageProducer producer = null;
> >>
> >>        try {
> >>            final String correlationId = UUID.randomUUID().toString();
> >>
> >>           * consumer = session.createConsumer(responseDestination,
> >> "JMSCorrelationID = '" + correlationId + "'");*
> >>
> >>            final ObjectMessage message =
> >> session.createObjectMessage(payload);
> >>            message.setJMSCorrelationID(correlationId);
> >>            message.setStringProperty("CLIENT_ID", clientUid);
> >>            message.setStringProperty("SERVER_ID", serverUid);
> >>            producer = session.createProducer(requestDestination);
> >>            *producer.send(message);*
> >>
> >>            *return consumer.receive(timeout);*
> >>        } finally {
> >>            // Don't forget to close your resources
> >>            JmsUtils.closeMessageConsumer(consumer);
> >>
> >>            JmsUtils.closeMessageProducer(producer);
> >>        }
> >>    }
> >> }
> >>
> >> Cheers,
> >> Kai
> >
> >
> >
> > --
> > http://fusesource.com
> > http://blog.garytully.com
>
>
>
> --
> http://fusesource.com
> http://blog.garytully.com
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message