activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Tully <gary.tu...@gmail.com>
Subject Re: ActiveMQ swallows messages in our environment, I need some suggestions
Date Tue, 13 Mar 2012 10:33:39 GMT
it has, see http://activemq.apache.org/schema/core/activemq-core-5.5.0-schema.html#networkConnector
  dispatchAsync	 xs:boolean	

On 12 March 2012 23:46, Kai Hackemesser <kai.hackemesser@gmail.com> wrote:
> Hi, Gary,
>
> I just tried that. The network connector hasn't that property, just the
> connection. we have set this on the connection factory in our client and
> server sample application but it did not improve the issue.
>
> Cheers,
> Kai
>
> 2012/3/13 Gary Tully <gary.tully@gmail.com>
>
>> one other thought, assuming non duplex, if you set dispatchAsync=false
>> for your network connector, advisories should always reach the other
>> broker before messages.
>>
>> On 12 March 2012 22:21, Gary Tully <gary.tully@gmail.com> wrote:
>> > I think that is a reasonable theory, depending on load, the order or
>> > reaction to advisories may not be totally reliable.
>> >
>> > One way to validate your theory is to enable the
>> > sendAdvisoryIfNoConsumers option and subscribe to that advisory to see
>> > if your messages end up there. The message will be the payload of the
>> > advisory
>> > see: http://activemq.apache.org/advisory-message.html
>> >
>> > Is your network bridge duplex? I came across an issue[1] where the
>> > symptom was dropped advisories but it was a problem with unacked
>> > messages in the duplex case. The workaround is to use two network
>> > connectors in place of duplex.
>> > [1] https://issues.apache.org/jira/browse/AMQ-3694
>> >
>> >
>> > On 12 March 2012 21:57, Kai Hackemesser <kai.hackemesser@gmail.com>
>> wrote:
>> >> Hello,
>> >>
>> >> - Given a network of V5.5.1 brokers named "brokerA" and "brokerB", and
>> >> - application "serviceA" is connected to "brokerA" and subscribed to a
>> >> "request" topic and ready to produce on a "response" topic, and
>> >> - client application "clientB" is connected to "brokerB", and
>> >> - named client is first creating a temporary consumer subscription
>> against
>> >> the "response" topic on "brokerB", then producing a request message for
>> the
>> >> "request" topic, and
>> >> - "serviceA" received the request message, produced the response
>> message in
>> >> the response topic on "brokerA",
>> >> - then the message is not always received by the temporary consumer on
>> >> broker B (receive times out).
>> >>
>> >> This happens often enough to make it an annoyance in our production
>> >> environment. We have proven that the service responds, but can't
>> understand
>> >> why the subscriber starves. I currently have the theory that when
>> serviceA
>> >> is responding on brokerA, brokerA is not yet aware of the new
>> >> consumer/subscriber on the response topic (maybe the network of brokers
>> >> advisory channels haven't been processed fast enough) and without
>> >> subscribers the message goes to Nirvana. Can this theoretically be the
>> >> case? We use Spring JmsTemplate on both ends of the topics and the
>> >> following class construction to do the synchronous request/response:
>> >>
>> >> import java.io.Serializable;
>> >> import java.util.UUID;
>> >> import javax.annotation.Resource;
>> >> import javax.jms.Destination;
>> >> import javax.jms.JMSException;
>> >> import javax.jms.Message;
>> >> import javax.jms.MessageConsumer;
>> >> import javax.jms.MessageProducer;
>> >> import javax.jms.ObjectMessage;
>> >> import javax.jms.Session;
>> >> import org.springframework.jms.core.JmsTemplate;
>> >> import org.springframework.jms.core.SessionCallback;
>> >> import org.springframework.jms.support.JmsUtils;
>> >> import org.springframework.stereotype.Component;
>> >>
>> >> /*
>> >>  * based on
>> >>
>> http://codedependents.com/2010/03/04/synchronous-request-response-with-activemq-and-spring/
>> >>  */
>> >> @Component
>> >> public class SynchronousJmsRequestor {
>> >>
>> >>    private static final int TIMEOUT = 5000;
>> >>    private final Destination requestDestination;
>> >>    private final Destination responseDestination;
>> >>    private long timeout = TIMEOUT;
>> >>
>> >>    private final String clientUid;
>> >>
>> >>    @Resource
>> >>    private JmsTemplate jmsTemplate;
>> >>
>> >>    public SynchronousJmsRequestor(
>> >>
>> >>            final Destination requestDestination,
>> >>            final Destination responseDestination,
>> >>            final String clientUid) {
>> >>
>> >>        this.responseDestination = responseDestination;
>> >>        this.requestDestination = requestDestination;
>> >>        this.clientUid = clientUid;
>> >>    }
>> >>
>> >>
>> >>
>> >>    public Serializable request(final String serverUid, final
>> Serializable
>> >> payload) throws JMSException {
>> >>
>> >>        JmsDelegate jmsDelegate = new JmsDelegate(payload,
>> >> requestDestination, responseDestination, serverUid, clientUid,
>> >>            timeout);
>> >>
>> >>        ObjectMessage response = (ObjectMessage)
>> >> jmsTemplate.execute(jmsDelegate, true);
>> >>
>> >>        return response == null ? null : response.getObject();
>> >>    }
>> >>
>> >>
>> >>    public long getTimeout() {
>> >>        return timeout;
>> >>    }
>> >>
>> >>
>> >>    public void setTimeout(long timeout) {
>> >>        this.timeout = timeout;
>> >>    }
>> >> }
>> >>
>> >> class JmsDelegate implements SessionCallback<Message> {
>> >>
>> >>    private Serializable payload;
>> >>    private final Destination requestDestination;
>> >>    private final Destination responseDestination;
>> >>    private String serverUid;
>> >>    private final String clientUid;
>> >>    private long timeout;
>> >>
>> >>    public JmsDelegate(Serializable payload, Destination
>> >> requestDestination, Destination responseDestination,
>> >>                       String serverUid, String clientUid, long
timeout)
>> {
>> >>        super();
>> >>        this.payload = payload;
>> >>        this.requestDestination = requestDestination;
>> >>        this.responseDestination = responseDestination;
>> >>        this.serverUid = serverUid;
>> >>        this.clientUid = clientUid;
>> >>        this.timeout = timeout;
>> >>    }
>> >>
>> >>    @Override
>> >>    public Message doInJms(final Session session) throws JMSException {
>> >>        MessageConsumer consumer = null;
>> >>        MessageProducer producer = null;
>> >>
>> >>        try {
>> >>            final String correlationId = UUID.randomUUID().toString();
>> >>
>> >>           * consumer = session.createConsumer(responseDestination,
>> >> "JMSCorrelationID = '" + correlationId + "'");*
>> >>
>> >>            final ObjectMessage message =
>> >> session.createObjectMessage(payload);
>> >>            message.setJMSCorrelationID(correlationId);
>> >>            message.setStringProperty("CLIENT_ID", clientUid);
>> >>            message.setStringProperty("SERVER_ID", serverUid);
>> >>            producer = session.createProducer(requestDestination);
>> >>            *producer.send(message);*
>> >>
>> >>            *return consumer.receive(timeout);*
>> >>        } finally {
>> >>            // Don't forget to close your resources
>> >>            JmsUtils.closeMessageConsumer(consumer);
>> >>
>> >>            JmsUtils.closeMessageProducer(producer);
>> >>        }
>> >>    }
>> >> }
>> >>
>> >> Cheers,
>> >> Kai
>> >
>> >
>> >
>> > --
>> > http://fusesource.com
>> > http://blog.garytully.com
>>
>>
>>
>> --
>> http://fusesource.com
>> http://blog.garytully.com
>>



-- 
http://fusesource.com
http://blog.garytully.com

Mime
View raw message