activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Strachan" <james.strac...@gmail.com>
Subject Re: All messages are not delivered to slower consumer(fast producer)
Date Tue, 13 Mar 2007 09:36:49 GMT
I've  not used the spring DefaultMessageListenerContainer, but is it
automatically doing the message.acknowledge() or do you have to do
that yourself?

On 3/12/07, ron55test <rknr55@gmail.com> wrote:
>
> Hi,
>
> I've been trying to solve this problem for the last two days without any
> luck.  Any help will be appreciated.
>
> I've a producer sending 5 messages (just some string) one after another in a
> for-loop.  The consumer is a slow consumer and consumes the messages 1 at a
> time at 1 minute interval.  I'm using spring's
> DefaultMessageListenerContainer with 1 concurrentConsumer
>
> I'm using JDBC persistence without transaction (but do
> CLIENT_ACKNOWLEDGEMENT).  I'm using Spring 2.0, activemq 4.1.
>
> This is what I notice:  The producer is sending the 5 messages 1 after
> another.  The messages get stored in the database ( I could query and find
> out).  The consumer get the first message and that is it.  The consumer does
> not get any more messages.
> In a nutshell, our production scenario is this:
> --our web client will be sending tasks to execute
> --We want our consumer to receive the task serially and process and send an
> email to the user with success/failure(even if there is an exception)
> message.
>
> Here is my code and configuration:
>
> //This our producer
> <bean id="asynchronousTaskManager"
> class="com.abc.jabc.jdm.service.jms.AsynchronousTaskManagerImpl" >
>      <property name="jmsTemplate" ref="jmsQueueTemplate" />
> </bean>
>
>          public void sendMethod()
>         {
>             for (int i =0 ; i < 5; i++)
>             {
>                 _jmsTemplate.convertAndSend(msg + i);
>             }
>         }
>
>
>
>  <bean id="jmsQueueTemplate"
> class="org.springframework.jms.core.JmsTemplate">
>    <property name="connectionFactory">
>     <ref bean="jmsConnectionFactory"/>
>    </property>
>    <property name="defaultDestination">
>     <ref bean="destination" />
>    </property>
>    <property name="receiveTimeout">
>     <value>1000000</value>
>    </property>
>       <property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
>  </bean>
>
> //consumer
>  <bean id="jmsReceiver" class="com.abc.jabc.jdm.service.jms.JMSReceiverImpl"
> >
>     <property name="jmsTemplate" ref="jmsQueueTemplate" />
>  </bean>
>
>           <bean id="jmsTemplate"
>                 class="org.springframework.jms.core.JmsTemplate">
>                 <property name="connectionFactory" ref="connectionFactory"
> />
>                 <property name="defaultDestination" ref="destination" />
>         </bean>
>
>  <bean id="jmsConnectionFactory"
> class="org.apache.activemq.pool.PooledConnectionFactory" >
>    <property name="connectionFactory">
>      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
>        <property name="brokerURL">
>          <value>tcp://localhost:61616?jms.prefetchPolicy.all=1</value>
>        </property>
>        <property name="dispatchAsync">
>          <value>true</value>
>        </property>
>        <property name="useAsyncSend">
>          <value>false</value>
>        </property>
>      </bean>
>    </property>
>  </bean>
>
>         <bean id="destination" class="org.activemq.message.ActiveMQQueue">
>                 <constructor-arg value="jmsExample" />
>         </bean>
>
>         <bean id="listenerContainer"
>
> class="org.springframework.jms.listener.DefaultMessageListenerContainer">
>                 <property name="concurrentConsumers" value="1" />
>                 <property name="connectionFactory" ref="connectionFactory"
> />
>                 <property name="destination" ref="destination" />
>                 <property name="messageListener" ref="jmsReceiver" />
>                  <property name="sessionAcknowledgeModeName"
> value="CLIENT_ACKNOWLEDGE"/>
>         </bean>
>
>
> Here is my consumer code:
> public class JMSReceiverImpl implements MessageListener
> {
>     private JmsTemplate _jmsTemplate;
>
>     public void onMessage(Message msg)
>     {
>         System.out.println("Inside onMessage ");
>         Message recievedMessage = _jmsTemplate.receive();
>         try
>         {
>             String message = ((TextMessage)recievedMessage).getText();
>             System.out.println("The received message is: " + message);
>             Thread.sleep(60000);
>         }
>         catch (Exception e)
>         {
>             e.printStackTrace(System.out);
>         }
>     }
>     ...
>   }
>
> This is my activemq.xml file:
>
>
> <beans>
>
>   <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
>   <broker brokerName="localhost" useJmx="true"
> xmlns="http://activemq.org/config/1.0">
>
>     <persistenceAdapter>
>       <jdbcPersistenceAdapter dataSource="#oracle-ds"/>
>     </persistenceAdapter>
>
>     <transportConnectors>
>        <transportConnector name="openwire"
> uri="tcp://localhost:61616?jms.prefetchPolicy.all=1" />
>     </transportConnectors>
>   </broker>
>
>   <!-- Oracle DataSource Sample Setup -->
>   <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource"
> destroy-method="close">
>     <property name="driverClassName"
> value="oracle.jdbc.driver.OracleDriver"/>
>     <property name="url"
> value="jdbc:oracle:thin:@dev10g.abc.com:1521:dev10g"/>
>     <property name="username" value="user"/>
>     <property name="password" value="pwd"/>
>     <property name="poolPreparedStatements" value="true"/>
>   </bean>
>
> </beans>
>
>
> Any help will be appreciated.
> --
> View this message in context: http://www.nabble.com/All-messages-are-not-delivered-to-slower-consumer%28fast-producer%29-tf3392768s2354.html#a9444953
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>


-- 

James
-------
http://radio.weblogs.com/0112098/

Mime
View raw message