activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ron55test <rkn...@gmail.com>
Subject All messages are not delivered to slower consumer(fast producer)
Date Mon, 12 Mar 2007 22:42:20 GMT

Hi,

I've been trying to solve this problem for the last two days without any
luck.  Any help will be appreciated.

I've a producer sending 5 messages (just some string) one after another in a
for-loop.  The consumer is a slow consumer and consumes the messages 1 at a
time at 1 minute interval.  I'm using spring's
DefaultMessageListenerContainer with 1 concurrentConsumer

I'm using JDBC persistence without transaction (but do
CLIENT_ACKNOWLEDGEMENT).  I'm using Spring 2.0, activemq 4.1.

This is what I notice:  The producer is sending the 5 messages 1 after
another.  The messages get stored in the database ( I could query and find
out).  The consumer get the first message and that is it.  The consumer does
not get any more messages. 
In a nutshell, our production scenario is this: 
--our web client will be sending tasks to execute
--We want our consumer to receive the task serially and process and send an
email to the user with success/failure(even if there is an exception)
message.

Here is my code and configuration:

//This our producer
<bean id="asynchronousTaskManager"
class="com.abc.jabc.jdm.service.jms.AsynchronousTaskManagerImpl" >
     <property name="jmsTemplate" ref="jmsQueueTemplate" />
</bean>

         public void sendMethod()
        {
            for (int i =0 ; i < 5; i++)
            {
                _jmsTemplate.convertAndSend(msg + i);
            }
        }



 <bean id="jmsQueueTemplate"
class="org.springframework.jms.core.JmsTemplate">
   <property name="connectionFactory">
    <ref bean="jmsConnectionFactory"/>
   </property>
   <property name="defaultDestination">
    <ref bean="destination" />
   </property>
   <property name="receiveTimeout">
    <value>1000000</value>
   </property>
      <property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
 </bean>

//consumer
 <bean id="jmsReceiver" class="com.abc.jabc.jdm.service.jms.JMSReceiverImpl"
>
    <property name="jmsTemplate" ref="jmsQueueTemplate" />
 </bean>

          <bean id="jmsTemplate"
                class="org.springframework.jms.core.JmsTemplate">
                <property name="connectionFactory" ref="connectionFactory"
/>
                <property name="defaultDestination" ref="destination" />
        </bean>

 <bean id="jmsConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory" >
   <property name="connectionFactory">
     <bean class="org.apache.activemq.ActiveMQConnectionFactory">
       <property name="brokerURL">
         <value>tcp://localhost:61616?jms.prefetchPolicy.all=1</value>
       </property>
       <property name="dispatchAsync">
         <value>true</value>
       </property>
       <property name="useAsyncSend">
         <value>false</value>
       </property>
     </bean>
   </property>
 </bean>

        <bean id="destination" class="org.activemq.message.ActiveMQQueue">
                <constructor-arg value="jmsExample" />
        </bean>

        <bean id="listenerContainer"
               
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                <property name="concurrentConsumers" value="1" />
                <property name="connectionFactory" ref="connectionFactory"
/>
                <property name="destination" ref="destination" />
                <property name="messageListener" ref="jmsReceiver" />
                 <property name="sessionAcknowledgeModeName"
value="CLIENT_ACKNOWLEDGE"/>
        </bean>


Here is my consumer code:
public class JMSReceiverImpl implements MessageListener
{
    private JmsTemplate _jmsTemplate;

    public void onMessage(Message msg)
    {
        System.out.println("Inside onMessage ");
        Message recievedMessage = _jmsTemplate.receive();
        try
        {
            String message = ((TextMessage)recievedMessage).getText();
            System.out.println("The received message is: " + message);
            Thread.sleep(60000);
        }
        catch (Exception e)
        {
            e.printStackTrace(System.out);
        }
    }
    ...
  }

This is my activemq.xml file:


<beans>

  <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
  <broker brokerName="localhost" useJmx="true"
xmlns="http://activemq.org/config/1.0">
  
    <persistenceAdapter>
      <jdbcPersistenceAdapter dataSource="#oracle-ds"/>
    </persistenceAdapter>
  
    <transportConnectors>
       <transportConnector name="openwire"
uri="tcp://localhost:61616?jms.prefetchPolicy.all=1" />
    </transportConnectors>
  </broker>
   
  <!-- Oracle DataSource Sample Setup -->
  <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
    <property name="driverClassName"
value="oracle.jdbc.driver.OracleDriver"/>
    <property name="url"
value="jdbc:oracle:thin:@dev10g.abc.com:1521:dev10g"/>
    <property name="username" value="user"/>
    <property name="password" value="pwd"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>

</beans>


Any help will be appreciated.
-- 
View this message in context: http://www.nabble.com/All-messages-are-not-delivered-to-slower-consumer%28fast-producer%29-tf3392768s2354.html#a9444953
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message