Return-Path: X-Original-To: apmail-activemq-users-archive@www.apache.org Delivered-To: apmail-activemq-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8C1A1964A for ; Mon, 12 Mar 2012 22:21:49 +0000 (UTC) Received: (qmail 24236 invoked by uid 500); 12 Mar 2012 22:21:49 -0000 Delivered-To: apmail-activemq-users-archive@activemq.apache.org Received: (qmail 24195 invoked by uid 500); 12 Mar 2012 22:21:49 -0000 Mailing-List: contact users-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@activemq.apache.org Delivered-To: mailing list users@activemq.apache.org Received: (qmail 24186 invoked by uid 99); 12 Mar 2012 22:21:49 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Mar 2012 22:21:49 +0000 X-ASF-Spam-Status: No, hits=-0.7 required=5.0 tests=RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of gary.tully@gmail.com designates 209.85.160.171 as permitted sender) Received: from [209.85.160.171] (HELO mail-gy0-f171.google.com) (209.85.160.171) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Mar 2012 22:21:42 +0000 Received: by ghbz17 with SMTP id z17so3614134ghb.2 for ; Mon, 12 Mar 2012 15:21:21 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type:content-transfer-encoding; bh=HCSNOqbWO0Sh8XKrzGabqFQHAky4O4fCf2DNl/m0gwc=; b=YKwCHxexBjC22E+/1kxOWHOBBkQ1+A4ZnCf6t9yaEPDH4GyjBsLmXTD15nWbpioK9a bwgPqLfms5CX1OyxeOXvgV0RDrNOPUvUocO1laAqXSJv4Un3Oznx5AzuWHDrX+vOCybB setXup3VNzn9HYXYSTNwZfQSpg8IQXRUJs9h+fEB3/YXpzFRdtPGp2FuEn7x+2h7FSTI EjWBK9+a6YRD8FkUBrTB5oyXzFFU8EWQChv9MRtF9C1pMZdW9VYhCa+oSfw6baKk7jLh chf/Dlmw/vs1/qOkTZhCqMyTRxSnUNbHh2v4PmIhxZQD4EHLaNsfx7cKehIO/l8+vWUM G+GA== MIME-Version: 1.0 Received: by 10.224.33.212 with SMTP id i20mr4068215qad.56.1331590881843; Mon, 12 Mar 2012 15:21:21 -0700 (PDT) Received: by 10.229.34.6 with HTTP; Mon, 12 Mar 2012 15:21:21 -0700 (PDT) In-Reply-To: References: Date: Mon, 12 Mar 2012 22:21:21 +0000 Message-ID: Subject: Re: ActiveMQ swallows messages in our environment, I need some suggestions From: Gary Tully To: users@activemq.apache.org Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable X-Virus-Checked: Checked by ClamAV on apache.org I think that is a reasonable theory, depending on load, the order or reaction to advisories may not be totally reliable. One way to validate your theory is to enable the sendAdvisoryIfNoConsumers option and subscribe to that advisory to see if your messages end up there. The message will be the payload of the advisory see: http://activemq.apache.org/advisory-message.html Is your network bridge duplex? I came across an issue[1] where the symptom was dropped advisories but it was a problem with unacked messages in the duplex case. The workaround is to use two network connectors in place of duplex. [1] https://issues.apache.org/jira/browse/AMQ-3694 On 12 March 2012 21:57, Kai Hackemesser wrote: > Hello, > > - Given a network of V5.5.1 brokers named "brokerA" and "brokerB", and > - application "serviceA" is connected to "brokerA" and subscribed to a > "request" topic and ready to produce on a "response" topic, and > - client application "clientB" is connected to "brokerB", and > - named client is first creating a temporary consumer subscription agains= t > the "response" topic on "brokerB", then producing a request message for t= he > "request" topic, and > - "serviceA" received the request message, produced the response message = in > the response topic on "brokerA", > - then the message is not always received by the temporary consumer on > broker B (receive times out). > > This happens often enough to make it an annoyance in our production > environment. We have proven that the service responds, but can't understa= nd > why the subscriber starves. I currently have the theory that when service= A > is responding on brokerA, brokerA is not yet aware of the new > consumer/subscriber on the response topic (maybe the network of brokers > advisory channels haven't been processed fast enough) and without > subscribers the message goes to Nirvana. Can this theoretically be the > case? We use Spring JmsTemplate on both ends of the topics and the > following class construction to do the synchronous request/response: > > import java.io.Serializable; > import java.util.UUID; > import javax.annotation.Resource; > import javax.jms.Destination; > import javax.jms.JMSException; > import javax.jms.Message; > import javax.jms.MessageConsumer; > import javax.jms.MessageProducer; > import javax.jms.ObjectMessage; > import javax.jms.Session; > import org.springframework.jms.core.JmsTemplate; > import org.springframework.jms.core.SessionCallback; > import org.springframework.jms.support.JmsUtils; > import org.springframework.stereotype.Component; > > /* > =A0* based on > http://codedependents.com/2010/03/04/synchronous-request-response-with-ac= tivemq-and-spring/ > =A0*/ > @Component > public class SynchronousJmsRequestor { > > =A0 =A0private static final int TIMEOUT =3D 5000; > =A0 =A0private final Destination requestDestination; > =A0 =A0private final Destination responseDestination; > =A0 =A0private long timeout =3D TIMEOUT; > > =A0 =A0private final String clientUid; > > =A0 =A0@Resource > =A0 =A0private JmsTemplate jmsTemplate; > > =A0 =A0public SynchronousJmsRequestor( > > =A0 =A0 =A0 =A0 =A0 =A0final Destination requestDestination, > =A0 =A0 =A0 =A0 =A0 =A0final Destination responseDestination, > =A0 =A0 =A0 =A0 =A0 =A0final String clientUid) { > > =A0 =A0 =A0 =A0this.responseDestination =3D responseDestination; > =A0 =A0 =A0 =A0this.requestDestination =3D requestDestination; > =A0 =A0 =A0 =A0this.clientUid =3D clientUid; > =A0 =A0} > > > > =A0 =A0public Serializable request(final String serverUid, final Serializ= able > payload) throws JMSException { > > =A0 =A0 =A0 =A0JmsDelegate jmsDelegate =3D new JmsDelegate(payload, > requestDestination, responseDestination, serverUid, clientUid, > =A0 =A0 =A0 =A0 =A0 =A0timeout); > > =A0 =A0 =A0 =A0ObjectMessage response =3D (ObjectMessage) > jmsTemplate.execute(jmsDelegate, true); > > =A0 =A0 =A0 =A0return response =3D=3D null ? null : response.getObject(); > =A0 =A0} > > > =A0 =A0public long getTimeout() { > =A0 =A0 =A0 =A0return timeout; > =A0 =A0} > > > =A0 =A0public void setTimeout(long timeout) { > =A0 =A0 =A0 =A0this.timeout =3D timeout; > =A0 =A0} > } > > class JmsDelegate implements SessionCallback { > > =A0 =A0private Serializable payload; > =A0 =A0private final Destination requestDestination; > =A0 =A0private final Destination responseDestination; > =A0 =A0private String serverUid; > =A0 =A0private final String clientUid; > =A0 =A0private long timeout; > > =A0 =A0public JmsDelegate(Serializable payload, Destination > requestDestination, Destination responseDestination, > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 String serverUid, String clie= ntUid, long timeout) { > =A0 =A0 =A0 =A0super(); > =A0 =A0 =A0 =A0this.payload =3D payload; > =A0 =A0 =A0 =A0this.requestDestination =3D requestDestination; > =A0 =A0 =A0 =A0this.responseDestination =3D responseDestination; > =A0 =A0 =A0 =A0this.serverUid =3D serverUid; > =A0 =A0 =A0 =A0this.clientUid =3D clientUid; > =A0 =A0 =A0 =A0this.timeout =3D timeout; > =A0 =A0} > > =A0 =A0@Override > =A0 =A0public Message doInJms(final Session session) throws JMSException = { > =A0 =A0 =A0 =A0MessageConsumer consumer =3D null; > =A0 =A0 =A0 =A0MessageProducer producer =3D null; > > =A0 =A0 =A0 =A0try { > =A0 =A0 =A0 =A0 =A0 =A0final String correlationId =3D UUID.randomUUID().t= oString(); > > =A0 =A0 =A0 =A0 =A0 * consumer =3D session.createConsumer(responseDestina= tion, > "JMSCorrelationID =3D '" + correlationId + "'");* > > =A0 =A0 =A0 =A0 =A0 =A0final ObjectMessage message =3D > session.createObjectMessage(payload); > =A0 =A0 =A0 =A0 =A0 =A0message.setJMSCorrelationID(correlationId); > =A0 =A0 =A0 =A0 =A0 =A0message.setStringProperty("CLIENT_ID", clientUid); > =A0 =A0 =A0 =A0 =A0 =A0message.setStringProperty("SERVER_ID", serverUid); > =A0 =A0 =A0 =A0 =A0 =A0producer =3D session.createProducer(requestDestina= tion); > =A0 =A0 =A0 =A0 =A0 =A0*producer.send(message);* > > =A0 =A0 =A0 =A0 =A0 =A0*return consumer.receive(timeout);* > =A0 =A0 =A0 =A0} finally { > =A0 =A0 =A0 =A0 =A0 =A0// Don't forget to close your resources > =A0 =A0 =A0 =A0 =A0 =A0JmsUtils.closeMessageConsumer(consumer); > > =A0 =A0 =A0 =A0 =A0 =A0JmsUtils.closeMessageProducer(producer); > =A0 =A0 =A0 =A0} > =A0 =A0} > } > > Cheers, > Kai --=20 http://fusesource.com http://blog.garytully.com