activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Adrian Neaga" <ane...@tacitknowledge.com>
Subject Re: Consumer does not receive anything after session.rollback()
Date Thu, 20 Jul 2006 06:19:44 GMT
Exactly, no messages get delivered until the one that failed is retried for 
3 times, i've been told that is the way queues work.
What i would want is to have messages delivered no matter if there was one 
failed before, that kind of breaking the queue.
I dont want to wait 1 hours for it to try redelivering a message for the 3d 
time to get all the messages that came in the meantime.
I'd appreciate any ideas of how this could be implemented.

Thanks.

----- Original Message ----- 
From: "James Strachan" <james.strachan@gmail.com>
To: <activemq-users@geronimo.apache.org>
Sent: Thursday, July 20, 2006 08:38
Subject: Re: Consumer does not receive anything after session.rollback()


> Am a little confused about whats happening and what you actually want
> to happen. Your producer is not part of the transaction; so at
> rollback you should just have your messages redelivered up to 3 times
> before the next message on the queue is delivered right? Could you
> explain exactly what is not working in your redelivery?
>
> If you think you can reproduce a bug of some kind it'd help us to help
> you if you refactored the code you mentioned into a little JUnit test
> case of just the producer and consumer
>
>
> On 7/19/06, Adrian Neaga <aneaga@tacitknowledge.com> wrote:
>> My Spring configuration is as follows:
>>
>>  <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
>>         <property name="config"
>> value="classpath:com/tacitknowledge/spring/mail/activemq.xml" />
>>         <property name="start" value="true" />
>>     </bean>
>>
>>     <bean id="jmsFactory"
>> class="org.apache.activemq.ActiveMQConnectionFactory" singleton="true">
>>         <property name="brokerURL" value="vm://localhost"/>
>>         <property name="redeliveryPolicy">
>>             <bean class="org.apache.activemq.RedeliveryPolicy">
>>                 <property name="initialRedeliveryDelay"
>>                           value="10000"/>
>>                 <property name="maximumRedeliveries"
>>                           value="3"/>
>>                 <property name="backOffMultiplier"
>>                           value="1"/>
>>                 <property name="useExponentialBackOff"
>>                           value="true"/>
>>             </bean>
>>         </property>
>>         <property name="useAsyncSend" value="true"/>
>>     </bean>
>>
>>     <!--<bean id="jmsTransactionManager"
>> class="org.springframework.jms.connection.JmsTransactionManager">
>>         <property name="connectionFactory"><ref
>> local="jmsFactory"/></property>
>>      </bean>-->
>>
>>     <!--  Spring JMS Template-->
>>     <bean id="mqJmsTemplate"
>> class="org.springframework.jms.core.JmsTemplate" singleton="true">
>>         <property name="connectionFactory">
>>             <!--  wrap in a pool to avoid creating a connection per 
>> send-->
>>             <bean
>> class="org.springframework.jms.connection.SingleConnectionFactory"
>> abstract="false" singleton="true"
>>                   lazy-init="default" autowire="default"
>> dependency-check="default">
>>                 <property name="targetConnectionFactory">
>>                     <ref local="jmsFactory"/>
>>                 </property>
>>             </bean>
>>         </property>
>>     </bean>
>>
>>     <bean id="jmsMailSender"
>> class="com.tacitknowledge.spring.mail.JMSMailSenderImpl" 
>> singleton="true">
>>         <property name="template">
>>             <ref bean="mqJmsTemplate"/>
>>         </property>
>>         <property name="destination">
>>             <ref bean="destination"/>
>>         </property>
>>         <property name="messageTimeToLive" value="86400000"/> <!--One day

>> to
>> live-->
>>     </bean>
>>
>>     <bean id="jmsMailConsumer"
>> class="com.tacitknowledge.spring.mail.JMSMailConsumer" singleton="true"
>>           init-method="start" destroy-method="stop">
>>         <property name="template"><ref bean="mqJmsTemplate"/></property>
>>         <property name="destination"><ref bean="destination"/></property>
>>         <property name="mailSender"><ref bean="mailSender"/></property>
>>     </bean>
>>
>>     <bean id="destination" 
>> class="org.apache.activemq.command.ActiveMQQueue"
>> autowire="constructor" abstract="false" singleton="true">
>>         <constructor-arg>
>>             <value>com.channels.mailTopic</value>
>>         </constructor-arg>
>>     </bean>
>>
>> ---------------------------------------------- ACTIVEMQ
>> config: ----------------------
>>
>> <beans>
>>     <bean
>> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
>>
>>     <broker useJmx="true" xmlns="http://activemq.org/config/1.0">
>>
>>         <managementContext>
>>             <managementContext connectorPort="1099"
>> jmxDomainName="org.apache.activemq"/>
>>         </managementContext>
>>
>>         <destinationPolicy>
>>             <policyMap>
>>                 <policyEntries>
>>                     <policyEntry topic="com.channels.mailTopic">
>>                         <dispatchPolicy>
>>                             <simpleDispatchPolicy/>
>>                         </dispatchPolicy>
>>                         <subscriptionRecoveryPolicy>
>>                             <timedSubscriptionRecoveryPolicy 
>> recoverDuration
>> = "3600000"/>
>>                         </subscriptionRecoveryPolicy>
>>                         <!-- lets force old messages to be discarded for
>> slow consumers -->
>>                         <pendingMessageLimitStrategy>
>>                             <constantPendingMessageLimitStrategy
>> limit="1000"/>
>>                         </pendingMessageLimitStrategy>
>>
>>                     </policyEntry>
>>                 </policyEntries>
>>             </policyMap>
>>         </destinationPolicy>
>>
>>         <persistenceAdapter>
>>             <jdbcPersistenceAdapter cleanupPeriod="86400000"
>> dataSource="#mysql-ds">
>>             </jdbcPersistenceAdapter>
>>         </persistenceAdapter>
>>
>>         <transportConnectors>
>>             <transportConnector name="default" 
>> uri="tcp://localhost:61616"
>>                                 discoveryUri="multicast://default"/>
>>             <transportConnector name="stomp" 
>> uri="stomp://localhost:61613"/>
>>         </transportConnectors>
>>
>>         <networkConnectors>
>>             <networkConnector name="default" uri="multicast://default"/>
>>         </networkConnectors>
>>     </broker>
>>
>>     <bean id="mysql-ds"
>> 
>> class="org.springframework.jdbc.datasource.DriverManagerDataSource">
>>         <property name="driverClassName">
>>             <value>com.mysql.jdbc.Driver</value>
>>         </property>
>>         <property name="url">
>>             <value>jdbc:mysql://localhost:3306/channels_dev</value>
>>         </property>
>>         <property name="username">
>>             <value>user</value>
>>         </property>
>>         <property name="password">
>>             <value>pass</value>
>>         </property>
>>     </bean>
>> </beans>
>>
>>
>> The jmsMailSender code:
>>
>>         template.setDeliveryMode(DeliveryMode.PERSISTENT);
>>         template.setExplicitQosEnabled(true);
>>
>>         if(messageTimeToLive > -1)
>>         {
>>             template.setTimeToLive(messageTimeToLive);
>>         }
>>
>>         template.send(destination, new MessageCreator()
>>         {
>>             public Message createMessage(Session session) throws
>> JMSException
>>             {
>>                 Message message =
>> session.createObjectMessage(simpleMailMessage);
>>                 log.debug("Sending message:" + message);
>>                 return message;
>>             }
>>         });
>>
>>
>>
>> And the consumer:
>>
>> public void start() throws JMSException
>>     {
>>         try
>>         {
>>             ConnectionFactory factory = template.getConnectionFactory();
>>             connection = factory.createConnection();
>>
>>             // we might be a reusable connection in spring
>>             // so lets only set the client ID once if its not set
>>             synchronized (connection)
>>             {
>>                 if (connection.getClientID() == null)
>>                 {
>>                     connection.setClientID(clientId);
>>                 }
>>             }
>>
>>             connection.start();
>>
>>             session = connection.createSession(true,
>> Session.AUTO_ACKNOWLEDGE);
>>             consumer = session.createConsumer(destination);
>>             consumer.setMessageListener(this);
>>             log.debug("JMSMailConsumer started");
>>         }
>>         catch (JMSException ex)
>>         {
>>             log.error(ex);
>>             throw ex;
>>         }
>>     }
>>
>>
>> public void onMessage(Message message)
>>     {
>>         if (message instanceof ObjectMessage)
>>         {
>>             ObjectMessage objectMessage = (ObjectMessage) message;
>>             try
>>             {
>>                 SimpleMailMessage mailMessage = (SimpleMailMessage)
>> objectMessage.getObject();
>>                 log.debug("Received JMS mail message " + mailMessage);
>>
>>                 try
>>                 {
>>                     mailSender.send(mailMessage);
>>                     mailMessageList.add(message);
>>                     session.commit();
>>                     log.debug("Sent the received JMS mail message to " +
>> mailMessage.getTo());
>>                 }
>>                 catch (MailException mailEx)
>>                 {
>>                     log.error(mailEx);
>>                     session.rollback();
>>                 }
>>             }
>>             catch (JMSException e)
>>             {
>>                 e.printStackTrace();
>>             }
>>         }
>>     }
>>
>>
>> As i worked on this today the problem is not that i dont receive anything 
>> at
>> all, the problem is that i receive all the rest only after it redelivers 
>> the
>> first failed message:
>>     -  Got M1 to send
>>      - Deliver M1
>>     - Session rolled back so retry
>>       ... time passed
>>      -Got M2 to send but it is not actually sent, or maybe the consumer 
>> just
>> doesnt want to get it
>>       - Retry delivering, again rollback
>>       ... time passed
>>       - Retry.....
>>       - After retried max times M2 is finally received by consumer.
>>
>> What i want is M2 to be delivered and received in the mean-time the M1 is
>> retried to be delivered.
>>
>> Thanks for reply !.
>>
>>
>>
>> ----- Original Message -----
>> From: "James Strachan" <james.strachan@gmail.com>
>> To: <activemq-users@geronimo.apache.org>
>> Sent: Wednesday, July 19, 2006 15:21
>> Subject: Re: Consumer does not receive anything after session.rollback()
>>
>>
>> > Could you demonstrate your issue in a JUnit test case so we can see
>> > how you are using the JMS API please?
>> >
>> > On 7/19/06, Adrian <neagu@mail.ru> wrote:
>> >>
>> >> I have a transacted session that in some cases does a rollback() thus
>> >> telling
>> >> the server to redeliver the message later, after such a rollback()
>> >> anything
>> >> sent is not received by the consumer, although the new messages are 
>> >> added
>> >> to
>> >> the database.
>> >> The next time i start the consumer it receives just the message it did
>> >> rollback on (for every message a rollback or a commit is done), 
>> >> nothing
>> >> else.
>> >> The docs say that anything that happens after commit or rollback is in 
>> >> a
>> >> new
>> >> transaction, why dont i receive the messages sent after rollback ?
>> >>
>> >> Thanks.
>> >> --
>> >> View this message in context:
>> >> http://www.nabble.com/Consumer-does-not-receive-anything-after-session.rollback%28%29-tf1965658.html#a5394170
>> >> Sent from the ActiveMQ - User forum at Nabble.com.
>> >>
>> >>
>> >
>> >
>> > --
>> >
>> > James
>> > -------
>> > http://radio.weblogs.com/0112098/
>> >
>>
>>
>
>
> -- 
>
> James
> -------
> http://radio.weblogs.com/0112098/
> 


Mime
View raw message