activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Torsten Mielke <tors...@fusesource.com>
Subject Re: Messages not being removed from networked queue
Date Thu, 27 Oct 2011 07:56:55 GMT
Hello, 

That should certainly work alright. Not sure why the msgs aren't dequeued from the consumer
broker the first time you connect your consumer.
Was wondering if the consumer acks the msgs but it uses AUTO_ACK, so acking should occur.

I took your two broker configs and fired them up here locally in my env. Then sent a msgs
to a test queue on the transmit broker. The msg got enqueued on this broker. 
Only when I started a consumer on the receive broker, was the msg forwarded to the receive
broker, from where it got consumed correctly. 
All JMS counters were updated accordingly. Restarting the consumer did not redeliver the msg.
As expected.

That made me check your client code once more and indeed there seems to be a problem. You
call

> this.session = this.connection.createSession(true,
> Session.AUTO_ACKNOWLEDGE);

The first argument you pass in is whether to use a transacted session or not. You are creating
a transacted session but you don't seem to commit the transaction anywhere in your code. 
Can you change the first argument to false and try again? The msg should now be consumed only
once.
If you want to use transactions, then you need to manually commit the tx somewhere in your
code. 


Hope that gets you going.


Torsten Mielke
torsten@fusesource.com
tmielke@blogspot.com


On Oct 25, 2011, at 4:18 PM, kureckam wrote:

> I have two activemq brokers networked together. The producer broker (has
> static network xml tag) shows enqueued and dequeued values matching when
> consumer broker consumes the message, but the dequeued value on the consumer
> broker shows zero and if I rerun the consumer it receives all the messages
> again. Below is all the code I'm using to test this. Why is the consumer
> queue not removing the message from the queue?
> 
> // Producer activemq.xml
> <beans xmlns="http://www.springframework.org/schema/beans"
>  xmlns:amq="http://activemq.apache.org/schema/core"
>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>  xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>  http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>    <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>        <property name="locations">
>            <value>file:${activemq.base}/conf/credentials.properties</value>
>        </property>      
>    </bean>
>    <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="transmitBroker" dataDirectory="${activemq.base}/data"
> destroyApplicationContextOnStop="true">
>        <destinationPolicy>
>            <policyMap>
>              <policyEntries>
>                <policyEntry topic=">" producerFlowControl="true"
> memoryLimit="1mb">
>                  <pendingSubscriberPolicy>
>                    <vmCursor />
>                  </pendingSubscriberPolicy>
>                </policyEntry>
>                <policyEntry queue=">" producerFlowControl="true"
> memoryLimit="1mb">
>                </policyEntry>
>              </policyEntries>
>            </policyMap>
>        </destinationPolicy> 
>        <managementContext>
>            <managementContext jmxDomainName="transmitDomainName"
> connectorPort="1098" />
>        </managementContext>
> 
>        <networkConnectors>
>           <networkConnector uri="static:(tcp://xx.xxx.x.xx:61619)"
> duplex="true" />
>        </networkConnectors>
>        <persistenceAdapter>
>            <kahaDB directory="${activemq.base}/data/transmit"/>
>        </persistenceAdapter>
>        <transportConnectors>
>           <transportConnector name="openwire" uri="tcp://0.0.0.0:61618"/>
>        </transportConnectors>
>    </broker>
>    <import resource="jetty.xml"/>
> </beans>
> 
> // MsgSenderTest.java
> import javax.jms.Connection;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.Session;
> 
> import org.apache.activemq.ActiveMQConnectionFactory;
> 
> public class MsgSenderTest
> {
>   public static void main(final String[] args_)   {
>      if(args_.length != 4)      {
>         System.out.println("Required parameters;IP, port, Test number and
> number of messages");
>         System.exit(0);
>      }
> 
>      final ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory("tcp://" + args_[0] + ":" + args_[1]);
> 
>      System.out.println("Connecting to ActiveMQ:" +
> connectionFactory.getBrokerURL());
> 
>      Connection connection = null;
>      Session startTopicSession = null;
>      MessageProducer startProducer = null;
> 
>      try      {
>         final int numberOfMessages = Integer.parseInt(args_[3]);
> 
>         connection = connectionFactory.createConnection();
>         connection.start();
> 
>         startTopicSession = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> 
>         final Queue startQueue = startTopicSession.createQueue("Test" +
> args_[2]);
> 
>         startProducer = startTopicSession.createProducer(startQueue);
> 
>         for(int i = 0; i < numberOfMessages; i++)
>         {
>            System.out.println("Sending message #" + (i + 1));
>            startProducer.send(startTopicSession.createMessage());
> 
>            try            {               Thread.sleep(500);            }
>            catch(final Exception e2)            {}
>         }
> 
>         final Message message = startTopicSession.createMessage();
> 
>         message.setStringProperty("END", "");
>         startProducer.send(message);
>      }
>      catch(final Exception e)      {         e.printStackTrace();      }
>      finally
>      {
>         if(startProducer != null)
>         {
>            try            {               startProducer.close();           
> }
>            catch(final JMSException e)            {              
> e.printStackTrace();            }
>         }
> 
>         if(startTopicSession != null)
>         {
>            try            {               startTopicSession.close();           
> }
>            catch(final JMSException e)            {              
> e.printStackTrace();            }
>         }
> 
>         if(connection != null)
>         {
>            try            {               connection.close();            }
>            catch(final JMSException e)            {              
> e.printStackTrace();            }
>         }
>      }
>   }
> }
> 
> // Consumer activemq.xml
> <beans xmlns="http://www.springframework.org/schema/beans"
>  xmlns:amq="http://activemq.apache.org/schema/core"
>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>  xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>  http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>    <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>        <property name="locations">
>            <value>file:${activemq.base}/conf/credentials.properties</value>
>        </property>      
>    </bean>
> 
>    <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="receiveBroker" dataDirectory="${activemq.base}/data"
> destroyApplicationContextOnStop="true">
>        <destinationPolicy>
>            <policyMap>
>              <policyEntries>
>                <policyEntry topic=">" producerFlowControl="true"
> memoryLimit="1mb">
>                  <pendingSubscriberPolicy>
>                    <vmCursor />
>                  </pendingSubscriberPolicy>
>                </policyEntry>
>                <policyEntry queue=">" producerFlowControl="true"
> memoryLimit="1mb">
>                </policyEntry>
>              </policyEntries>
>            </policyMap>
>        </destinationPolicy> 
> 
>        <managementContext>
>            <managementContext jmxDomainName="receiveDomainName"
> connectorPort="1099" />
>        </managementContext>
> 
>         <networkConnectors>
>         </networkConnectors>
> 
>        <persistenceAdapter>
>            <kahaDB directory="${activemq.base}/data/receive"/>
>        </persistenceAdapter>
> 
>        <transportConnectors>
>           <transportConnector name="openwire" uri="tcp://0.0.0.0:61619"/>
>        </transportConnectors>
>    </broker>
>    <import resource="jetty.xml"/>
> </beans>
> 
> // MsgListenerTest.java
> import javax.jms.Connection;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.Session;
> 
> import org.apache.activemq.ActiveMQConnectionFactory;
> 
> public class MsgListenerTest implements MessageListener
> {
>   private final ActiveMQConnectionFactory connectionFactory;
>   private Connection connection;
>   private Session session;
>   private MessageConsumer consumer;
> 
>   private int msgNumber = 1;
> 
>   public static void main(final String[] args_)
>   {
>      if(args_.length != 3){
>         System.out.println("Required parameters: IP, port, test number");
>         System.exit(0);
>      }
> 
>      new MsgListenerTest(args_);
>   }
> 
>   public MsgListenerTest(final String[] args_){
>      this.connectionFactory = new ActiveMQConnectionFactory("tcp://" +
> args_[0] + ":" + args_[1]);
> 
>      final String queueName = "Test" + args_[2];
> 
>      try
>      {
>         this.connection = this.connectionFactory.createConnection();
>         this.connection.start();
> 
>         this.session = this.connection.createSession(true,
> Session.AUTO_ACKNOWLEDGE);
> 
>         this.consumer =
> this.session.createConsumer(this.session.createQueue(queueName));
> 
>         this.consumer.setMessageListener(this);
>      }
>      catch(final Exception e){e.printStackTrace();}
>   }
> 
>   @Override
>   public void onMessage(final Message message_)
>   {
>      try
>      {
>         if(message_.getStringProperty("END") == null)
>         {
>            System.out.println("Received messaage #" + this.msgNumber);
> 
>            this.msgNumber++;
>         }
>         else
>         {
>            if(this.consumer != null)
>            {
>               try{this.consumer.close();}
>               catch(final JMSException e){e.printStackTrace();}
>            }
> 
>            if(this.session != null)
>            {
>               try{this.session.close();}
>               catch(final JMSException e){e.printStackTrace();}
>            }
> 
>            if(this.connection != null)
>            {
>               try{this.connection.close();}
>               catch(final JMSException e) {e.printStackTrace();}
>            }
>         }
>      }
>      catch(final JMSException e2){e2.printStackTrace();}
>   }
> }
> 
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Messages-not-being-removed-from-networked-queue-tp3936864p3936864.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.





Mime
View raw message