Return-Path: X-Original-To: apmail-activemq-users-archive@www.apache.org Delivered-To: apmail-activemq-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6D213996F for ; Thu, 27 Oct 2011 07:57:28 +0000 (UTC) Received: (qmail 70950 invoked by uid 500); 27 Oct 2011 07:57:28 -0000 Delivered-To: apmail-activemq-users-archive@activemq.apache.org Received: (qmail 70829 invoked by uid 500); 27 Oct 2011 07:57:27 -0000 Mailing-List: contact users-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@activemq.apache.org Delivered-To: mailing list users@activemq.apache.org Received: (qmail 70819 invoked by uid 99); 27 Oct 2011 07:57:26 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Oct 2011 07:57:26 +0000 X-ASF-Spam-Status: No, hits=-1.0 required=5.0 tests=RCVD_IN_DNSWL_MED,SPF_PASS,URI_HEX X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of torsten@fusesource.com designates 74.125.245.80 as permitted sender) Received: from [74.125.245.80] (HELO na3sys010aog106.obsmtp.com) (74.125.245.80) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Oct 2011 07:57:20 +0000 Received: from mail-ww0-f50.google.com ([74.125.82.50]) (using TLSv1) by na3sys010aob106.postini.com ([74.125.244.12]) with SMTP; Thu, 27 Oct 2011 00:57:00 PDT Received: by mail-ww0-f50.google.com with SMTP id 18so3567402wwi.31 for ; Thu, 27 Oct 2011 00:56:59 -0700 (PDT) Received: by 10.216.229.211 with SMTP id h61mr756130weq.24.1319702219114; Thu, 27 Oct 2011 00:56:59 -0700 (PDT) Received: from [192.168.178.31] (p57BD69B8.dip0.t-ipconnect.de. [87.189.105.184]) by mx.google.com with ESMTPS id gg13sm7731101wbb.8.2011.10.27.00.56.57 (version=TLSv1/SSLv3 cipher=OTHER); Thu, 27 Oct 2011 00:56:58 -0700 (PDT) Content-Type: text/plain; charset=us-ascii Mime-Version: 1.0 (Apple Message framework v1084) Subject: Re: Messages not being removed from networked queue From: Torsten Mielke In-Reply-To: <1319552300536-3936864.post@n4.nabble.com> Date: Thu, 27 Oct 2011 09:56:55 +0200 Content-Transfer-Encoding: quoted-printable Message-Id: <348BF776-238E-451D-8203-6B897A05EEFB@fusesource.com> References: <1319552300536-3936864.post@n4.nabble.com> To: users@activemq.apache.org X-Mailer: Apple Mail (2.1084) Hello,=20 That should certainly work alright. Not sure why the msgs aren't = dequeued from the consumer broker the first time you connect your = consumer. Was wondering if the consumer acks the msgs but it uses AUTO_ACK, so = acking should occur. I took your two broker configs and fired them up here locally in my env. = Then sent a msgs to a test queue on the transmit broker. The msg got = enqueued on this broker.=20 Only when I started a consumer on the receive broker, was the msg = forwarded to the receive broker, from where it got consumed correctly.=20= All JMS counters were updated accordingly. Restarting the consumer did = not redeliver the msg. As expected. That made me check your client code once more and indeed there seems to = be a problem. You call > this.session =3D this.connection.createSession(true, > Session.AUTO_ACKNOWLEDGE); The first argument you pass in is whether to use a transacted session or = not. You are creating a transacted session but you don't seem to commit = the transaction anywhere in your code.=20 Can you change the first argument to false and try again? The msg should = now be consumed only once. If you want to use transactions, then you need to manually commit the tx = somewhere in your code.=20 Hope that gets you going. Torsten Mielke torsten@fusesource.com tmielke@blogspot.com On Oct 25, 2011, at 4:18 PM, kureckam wrote: > I have two activemq brokers networked together. The producer broker = (has > static network xml tag) shows enqueued and dequeued values matching = when > consumer broker consumes the message, but the dequeued value on the = consumer > broker shows zero and if I rerun the consumer it receives all the = messages > again. Below is all the code I'm using to test this. Why is the = consumer > queue not removing the message from the queue? >=20 > // Producer activemq.xml > xmlns:amq=3D"http://activemq.apache.org/schema/core" > xmlns:xsi=3D"http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation=3D"http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans-2.0.xsd > http://activemq.apache.org/schema/core > http://activemq.apache.org/schema/core/activemq-core.xsd"> > = class=3D"org.springframework.beans.factory.config.PropertyPlaceholderConfi= gurer"> > > = file:${activemq.base}/conf/credentials.properties > =20 > > brokerName=3D"transmitBroker" dataDirectory=3D"${activemq.base}/data" > destroyApplicationContextOnStop=3D"true"> > > > > " producerFlowControl=3D"true" > memoryLimit=3D"1mb"> > > > > > " producerFlowControl=3D"true" > memoryLimit=3D"1mb"> > > > > =20 > > connectorPort=3D"1098" /> > >=20 > > duplex=3D"true" /> > > > > > > > > > > >=20 > // MsgSenderTest.java > import javax.jms.Connection; > import javax.jms.JMSException; > import javax.jms.Message; > import javax.jms.MessageProducer; > import javax.jms.Queue; > import javax.jms.Session; >=20 > import org.apache.activemq.ActiveMQConnectionFactory; >=20 > public class MsgSenderTest > { > public static void main(final String[] args_) { > if(args_.length !=3D 4) { > System.out.println("Required parameters;IP, port, Test number = and > number of messages"); > System.exit(0); > } >=20 > final ActiveMQConnectionFactory connectionFactory =3D new > ActiveMQConnectionFactory("tcp://" + args_[0] + ":" + args_[1]); >=20 > System.out.println("Connecting to ActiveMQ:" + > connectionFactory.getBrokerURL()); >=20 > Connection connection =3D null; > Session startTopicSession =3D null; > MessageProducer startProducer =3D null; >=20 > try { > final int numberOfMessages =3D Integer.parseInt(args_[3]); >=20 > connection =3D connectionFactory.createConnection(); > connection.start(); >=20 > startTopicSession =3D connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); >=20 > final Queue startQueue =3D = startTopicSession.createQueue("Test" + > args_[2]); >=20 > startProducer =3D = startTopicSession.createProducer(startQueue); >=20 > for(int i =3D 0; i < numberOfMessages; i++) > { > System.out.println("Sending message #" + (i + 1)); > startProducer.send(startTopicSession.createMessage()); >=20 > try { Thread.sleep(500); = } > catch(final Exception e2) {} > } >=20 > final Message message =3D startTopicSession.createMessage(); >=20 > message.setStringProperty("END", ""); > startProducer.send(message); > } > catch(final Exception e) { e.printStackTrace(); = } > finally > { > if(startProducer !=3D null) > { > try { startProducer.close(); = =20 > } > catch(final JMSException e) { =20 > e.printStackTrace(); } > } >=20 > if(startTopicSession !=3D null) > { > try { startTopicSession.close(); = =20 > } > catch(final JMSException e) { =20 > e.printStackTrace(); } > } >=20 > if(connection !=3D null) > { > try { connection.close(); = } > catch(final JMSException e) { =20 > e.printStackTrace(); } > } > } > } > } >=20 > // Consumer activemq.xml > xmlns:amq=3D"http://activemq.apache.org/schema/core" > xmlns:xsi=3D"http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation=3D"http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans-2.0.xsd > http://activemq.apache.org/schema/core > http://activemq.apache.org/schema/core/activemq-core.xsd"> > = class=3D"org.springframework.beans.factory.config.PropertyPlaceholderConfi= gurer"> > > = file:${activemq.base}/conf/credentials.properties > =20 > >=20 > brokerName=3D"receiveBroker" dataDirectory=3D"${activemq.base}/data" > destroyApplicationContextOnStop=3D"true"> > > > > " producerFlowControl=3D"true" > memoryLimit=3D"1mb"> > > > > > " producerFlowControl=3D"true" > memoryLimit=3D"1mb"> > > > > =20 >=20 > > connectorPort=3D"1099" /> > >=20 > > >=20 > > > >=20 > > > > > > >=20 > // MsgListenerTest.java > import javax.jms.Connection; > import javax.jms.JMSException; > import javax.jms.Message; > import javax.jms.MessageConsumer; > import javax.jms.MessageListener; > import javax.jms.Session; >=20 > import org.apache.activemq.ActiveMQConnectionFactory; >=20 > public class MsgListenerTest implements MessageListener > { > private final ActiveMQConnectionFactory connectionFactory; > private Connection connection; > private Session session; > private MessageConsumer consumer; >=20 > private int msgNumber =3D 1; >=20 > public static void main(final String[] args_) > { > if(args_.length !=3D 3){ > System.out.println("Required parameters: IP, port, test = number"); > System.exit(0); > } >=20 > new MsgListenerTest(args_); > } >=20 > public MsgListenerTest(final String[] args_){ > this.connectionFactory =3D new ActiveMQConnectionFactory("tcp://" = + > args_[0] + ":" + args_[1]); >=20 > final String queueName =3D "Test" + args_[2]; >=20 > try > { > this.connection =3D this.connectionFactory.createConnection(); > this.connection.start(); >=20 > this.session =3D this.connection.createSession(true, > Session.AUTO_ACKNOWLEDGE); >=20 > this.consumer =3D > this.session.createConsumer(this.session.createQueue(queueName)); >=20 > this.consumer.setMessageListener(this); > } > catch(final Exception e){e.printStackTrace();} > } >=20 > @Override > public void onMessage(final Message message_) > { > try > { > if(message_.getStringProperty("END") =3D=3D null) > { > System.out.println("Received messaage #" + this.msgNumber); >=20 > this.msgNumber++; > } > else > { > if(this.consumer !=3D null) > { > try{this.consumer.close();} > catch(final JMSException e){e.printStackTrace();} > } >=20 > if(this.session !=3D null) > { > try{this.session.close();} > catch(final JMSException e){e.printStackTrace();} > } >=20 > if(this.connection !=3D null) > { > try{this.connection.close();} > catch(final JMSException e) {e.printStackTrace();} > } > } > } > catch(final JMSException e2){e2.printStackTrace();} > } > } >=20 > -- > View this message in context: = http://activemq.2283324.n4.nabble.com/Messages-not-being-removed-from-netw= orked-queue-tp3936864p3936864.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com.