activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Strachan" <james.strac...@gmail.com>
Subject Re: [Spam: 5.0] All messages are not delivered to slower consumer(fast producer)
Date Tue, 13 Mar 2007 15:23:04 GMT
Does it work if you use SimpleMessageListenerContainer?

On 3/13/07, ron55test <rknr55@gmail.com> wrote:
>
> It is automatically doing the message.acknowledge.  I've set the
> DefaultMessageListenerContainer to do CLIENT_ACKNOWLEDGE as
>
>         <bean id="listenerContainer"
> class="org.springframework.jms.listener.DefaultMessageListenerContainer">
>                 <property name="concurrentConsumers" value="1"/>
>             <property name="connectionFactory" ref="jmsConnectionFactory" />
>             <property name="destination" ref="destination" />
>             <property name="messageListener" ref="jmsReceiver" />
>             <property name="sessionAcknowledgeModeName"
> value="CLIENT_ACKNOWLEDGE"/>
>       <property name="receiveTimeout" value="-1"/>
>         </bean>
>
>
> James.Strachan wrote:
> >
> > I've  not used the spring DefaultMessageListenerContainer, but is it
> > automatically doing the message.acknowledge() or do you have to do
> > that yourself?
> >
> > On 3/12/07, ron55test <rknr55@gmail.com> wrote:
> >>
> >> Hi,
> >>
> >> I've been trying to solve this problem for the last two days without any
> >> luck.  Any help will be appreciated.
> >>
> >> I've a producer sending 5 messages (just some string) one after another
> >> in a
> >> for-loop.  The consumer is a slow consumer and consumes the messages 1 at
> >> a
> >> time at 1 minute interval.  I'm using spring's
> >> DefaultMessageListenerContainer with 1 concurrentConsumer
> >>
> >> I'm using JDBC persistence without transaction (but do
> >> CLIENT_ACKNOWLEDGEMENT).  I'm using Spring 2.0, activemq 4.1.
> >>
> >> This is what I notice:  The producer is sending the 5 messages 1 after
> >> another.  The messages get stored in the database ( I could query and
> >> find
> >> out).  The consumer get the first message and that is it.  The consumer
> >> does
> >> not get any more messages.
> >> In a nutshell, our production scenario is this:
> >> --our web client will be sending tasks to execute
> >> --We want our consumer to receive the task serially and process and send
> >> an
> >> email to the user with success/failure(even if there is an exception)
> >> message.
> >>
> >> Here is my code and configuration:
> >>
> >> //This our producer
> >> <bean id="asynchronousTaskManager"
> >> class="com.abc.jabc.jdm.service.jms.AsynchronousTaskManagerImpl" >
> >>      <property name="jmsTemplate" ref="jmsQueueTemplate" />
> >> </bean>
> >>
> >>          public void sendMethod()
> >>         {
> >>             for (int i =0 ; i < 5; i++)
> >>             {
> >>                 _jmsTemplate.convertAndSend(msg + i);
> >>             }
> >>         }
> >>
> >>
> >>
> >>  <bean id="jmsQueueTemplate"
> >> class="org.springframework.jms.core.JmsTemplate">
> >>    <property name="connectionFactory">
> >>     <ref bean="jmsConnectionFactory"/>
> >>    </property>
> >>    <property name="defaultDestination">
> >>     <ref bean="destination" />
> >>    </property>
> >>    <property name="receiveTimeout">
> >>     <value>1000000</value>
> >>    </property>
> >>       <property name="sessionAcknowledgeModeName"
> >> value="AUTO_ACKNOWLEDGE"/>
> >>  </bean>
> >>
> >> //consumer
> >>  <bean id="jmsReceiver"
> >> class="com.abc.jabc.jdm.service.jms.JMSReceiverImpl"
> >> >
> >>     <property name="jmsTemplate" ref="jmsQueueTemplate" />
> >>  </bean>
> >>
> >>           <bean id="jmsTemplate"
> >>                 class="org.springframework.jms.core.JmsTemplate">
> >>                 <property name="connectionFactory"
> >> ref="connectionFactory"
> >> />
> >>                 <property name="defaultDestination" ref="destination" />
> >>         </bean>
> >>
> >>  <bean id="jmsConnectionFactory"
> >> class="org.apache.activemq.pool.PooledConnectionFactory" >
> >>    <property name="connectionFactory">
> >>      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
> >>        <property name="brokerURL">
> >>          <value>tcp://localhost:61616?jms.prefetchPolicy.all=1</value>
> >>        </property>
> >>        <property name="dispatchAsync">
> >>          <value>true</value>
> >>        </property>
> >>        <property name="useAsyncSend">
> >>          <value>false</value>
> >>        </property>
> >>      </bean>
> >>    </property>
> >>  </bean>
> >>
> >>         <bean id="destination"
> >> class="org.activemq.message.ActiveMQQueue">
> >>                 <constructor-arg value="jmsExample" />
> >>         </bean>
> >>
> >>         <bean id="listenerContainer"
> >>
> >> class="org.springframework.jms.listener.DefaultMessageListenerContainer">
> >>                 <property name="concurrentConsumers" value="1" />
> >>                 <property name="connectionFactory"
> >> ref="connectionFactory"
> >> />
> >>                 <property name="destination" ref="destination" />
> >>                 <property name="messageListener" ref="jmsReceiver" />
> >>                  <property name="sessionAcknowledgeModeName"
> >> value="CLIENT_ACKNOWLEDGE"/>
> >>         </bean>
> >>
> >>
> >> Here is my consumer code:
> >> public class JMSReceiverImpl implements MessageListener
> >> {
> >>     private JmsTemplate _jmsTemplate;
> >>
> >>     public void onMessage(Message msg)
> >>     {
> >>         System.out.println("Inside onMessage ");
> >>         Message recievedMessage = _jmsTemplate.receive();
> >>         try
> >>         {
> >>             String message = ((TextMessage)recievedMessage).getText();
> >>             System.out.println("The received message is: " + message);
> >>             Thread.sleep(60000);
> >>         }
> >>         catch (Exception e)
> >>         {
> >>             e.printStackTrace(System.out);
> >>         }
> >>     }
> >>     ...
> >>   }
> >>
> >> This is my activemq.xml file:
> >>
> >>
> >> <beans>
> >>
> >>   <bean
> >> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
> >>   <broker brokerName="localhost" useJmx="true"
> >> xmlns="http://activemq.org/config/1.0">
> >>
> >>     <persistenceAdapter>
> >>       <jdbcPersistenceAdapter dataSource="#oracle-ds"/>
> >>     </persistenceAdapter>
> >>
> >>     <transportConnectors>
> >>        <transportConnector name="openwire"
> >> uri="tcp://localhost:61616?jms.prefetchPolicy.all=1" />
> >>     </transportConnectors>
> >>   </broker>
> >>
> >>   <!-- Oracle DataSource Sample Setup -->
> >>   <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource"
> >> destroy-method="close">
> >>     <property name="driverClassName"
> >> value="oracle.jdbc.driver.OracleDriver"/>
> >>     <property name="url"
> >> value="jdbc:oracle:thin:@dev10g.abc.com:1521:dev10g"/>
> >>     <property name="username" value="user"/>
> >>     <property name="password" value="pwd"/>
> >>     <property name="poolPreparedStatements" value="true"/>
> >>   </bean>
> >>
> >> </beans>
> >>
> >>
> >> Any help will be appreciated.
> >> --
> >> View this message in context:
> >> http://www.nabble.com/All-messages-are-not-delivered-to-slower-consumer%28fast-producer%29-tf3392768s2354.html#a9444953
> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >>
> >>
> >
> >
> > --
> >
> > James
> > -------
> > http://radio.weblogs.com/0112098/
> >
> >
>
> --
> View this message in context: http://www.nabble.com/All-messages-are-not-delivered-to-slower-consumer%28fast-producer%29-tf3392768s2354.html#a9456293
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>


-- 

James
-------
http://radio.weblogs.com/0112098/

Mime
View raw message