activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Strachan" <james.strac...@gmail.com>
Subject Re: Consumer does not receive anything after session.rollback()
Date Thu, 20 Jul 2006 05:38:57 GMT
Am a little confused about whats happening and what you actually want
to happen. Your producer is not part of the transaction; so at
rollback you should just have your messages redelivered up to 3 times
before the next message on the queue is delivered right? Could you
explain exactly what is not working in your redelivery?

If you think you can reproduce a bug of some kind it'd help us to help
you if you refactored the code you mentioned into a little JUnit test
case of just the producer and consumer


On 7/19/06, Adrian Neaga <aneaga@tacitknowledge.com> wrote:
> My Spring configuration is as follows:
>
>  <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
>         <property name="config"
> value="classpath:com/tacitknowledge/spring/mail/activemq.xml" />
>         <property name="start" value="true" />
>     </bean>
>
>     <bean id="jmsFactory"
> class="org.apache.activemq.ActiveMQConnectionFactory" singleton="true">
>         <property name="brokerURL" value="vm://localhost"/>
>         <property name="redeliveryPolicy">
>             <bean class="org.apache.activemq.RedeliveryPolicy">
>                 <property name="initialRedeliveryDelay"
>                           value="10000"/>
>                 <property name="maximumRedeliveries"
>                           value="3"/>
>                 <property name="backOffMultiplier"
>                           value="1"/>
>                 <property name="useExponentialBackOff"
>                           value="true"/>
>             </bean>
>         </property>
>         <property name="useAsyncSend" value="true"/>
>     </bean>
>
>     <!--<bean id="jmsTransactionManager"
> class="org.springframework.jms.connection.JmsTransactionManager">
>         <property name="connectionFactory"><ref
> local="jmsFactory"/></property>
>      </bean>-->
>
>     <!--  Spring JMS Template-->
>     <bean id="mqJmsTemplate"
> class="org.springframework.jms.core.JmsTemplate" singleton="true">
>         <property name="connectionFactory">
>             <!--  wrap in a pool to avoid creating a connection per send-->
>             <bean
> class="org.springframework.jms.connection.SingleConnectionFactory"
> abstract="false" singleton="true"
>                   lazy-init="default" autowire="default"
> dependency-check="default">
>                 <property name="targetConnectionFactory">
>                     <ref local="jmsFactory"/>
>                 </property>
>             </bean>
>         </property>
>     </bean>
>
>     <bean id="jmsMailSender"
> class="com.tacitknowledge.spring.mail.JMSMailSenderImpl" singleton="true">
>         <property name="template">
>             <ref bean="mqJmsTemplate"/>
>         </property>
>         <property name="destination">
>             <ref bean="destination"/>
>         </property>
>         <property name="messageTimeToLive" value="86400000"/> <!--One day to
> live-->
>     </bean>
>
>     <bean id="jmsMailConsumer"
> class="com.tacitknowledge.spring.mail.JMSMailConsumer" singleton="true"
>           init-method="start" destroy-method="stop">
>         <property name="template"><ref bean="mqJmsTemplate"/></property>
>         <property name="destination"><ref bean="destination"/></property>
>         <property name="mailSender"><ref bean="mailSender"/></property>
>     </bean>
>
>     <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"
> autowire="constructor" abstract="false" singleton="true">
>         <constructor-arg>
>             <value>com.channels.mailTopic</value>
>         </constructor-arg>
>     </bean>
>
> ---------------------------------------------- ACTIVEMQ
> config: ----------------------
>
> <beans>
>     <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
>
>     <broker useJmx="true" xmlns="http://activemq.org/config/1.0">
>
>         <managementContext>
>             <managementContext connectorPort="1099"
> jmxDomainName="org.apache.activemq"/>
>         </managementContext>
>
>         <destinationPolicy>
>             <policyMap>
>                 <policyEntries>
>                     <policyEntry topic="com.channels.mailTopic">
>                         <dispatchPolicy>
>                             <simpleDispatchPolicy/>
>                         </dispatchPolicy>
>                         <subscriptionRecoveryPolicy>
>                             <timedSubscriptionRecoveryPolicy recoverDuration
> = "3600000"/>
>                         </subscriptionRecoveryPolicy>
>                         <!-- lets force old messages to be discarded for
> slow consumers -->
>                         <pendingMessageLimitStrategy>
>                             <constantPendingMessageLimitStrategy
> limit="1000"/>
>                         </pendingMessageLimitStrategy>
>
>                     </policyEntry>
>                 </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>
>         <persistenceAdapter>
>             <jdbcPersistenceAdapter cleanupPeriod="86400000"
> dataSource="#mysql-ds">
>             </jdbcPersistenceAdapter>
>         </persistenceAdapter>
>
>         <transportConnectors>
>             <transportConnector name="default" uri="tcp://localhost:61616"
>                                 discoveryUri="multicast://default"/>
>             <transportConnector name="stomp" uri="stomp://localhost:61613"/>
>         </transportConnectors>
>
>         <networkConnectors>
>             <networkConnector name="default" uri="multicast://default"/>
>         </networkConnectors>
>     </broker>
>
>     <bean id="mysql-ds"
>           class="org.springframework.jdbc.datasource.DriverManagerDataSource">
>         <property name="driverClassName">
>             <value>com.mysql.jdbc.Driver</value>
>         </property>
>         <property name="url">
>             <value>jdbc:mysql://localhost:3306/channels_dev</value>
>         </property>
>         <property name="username">
>             <value>user</value>
>         </property>
>         <property name="password">
>             <value>pass</value>
>         </property>
>     </bean>
> </beans>
>
>
> The jmsMailSender code:
>
>         template.setDeliveryMode(DeliveryMode.PERSISTENT);
>         template.setExplicitQosEnabled(true);
>
>         if(messageTimeToLive > -1)
>         {
>             template.setTimeToLive(messageTimeToLive);
>         }
>
>         template.send(destination, new MessageCreator()
>         {
>             public Message createMessage(Session session) throws
> JMSException
>             {
>                 Message message =
> session.createObjectMessage(simpleMailMessage);
>                 log.debug("Sending message:" + message);
>                 return message;
>             }
>         });
>
>
>
> And the consumer:
>
> public void start() throws JMSException
>     {
>         try
>         {
>             ConnectionFactory factory = template.getConnectionFactory();
>             connection = factory.createConnection();
>
>             // we might be a reusable connection in spring
>             // so lets only set the client ID once if its not set
>             synchronized (connection)
>             {
>                 if (connection.getClientID() == null)
>                 {
>                     connection.setClientID(clientId);
>                 }
>             }
>
>             connection.start();
>
>             session = connection.createSession(true,
> Session.AUTO_ACKNOWLEDGE);
>             consumer = session.createConsumer(destination);
>             consumer.setMessageListener(this);
>             log.debug("JMSMailConsumer started");
>         }
>         catch (JMSException ex)
>         {
>             log.error(ex);
>             throw ex;
>         }
>     }
>
>
> public void onMessage(Message message)
>     {
>         if (message instanceof ObjectMessage)
>         {
>             ObjectMessage objectMessage = (ObjectMessage) message;
>             try
>             {
>                 SimpleMailMessage mailMessage = (SimpleMailMessage)
> objectMessage.getObject();
>                 log.debug("Received JMS mail message " + mailMessage);
>
>                 try
>                 {
>                     mailSender.send(mailMessage);
>                     mailMessageList.add(message);
>                     session.commit();
>                     log.debug("Sent the received JMS mail message to " +
> mailMessage.getTo());
>                 }
>                 catch (MailException mailEx)
>                 {
>                     log.error(mailEx);
>                     session.rollback();
>                 }
>             }
>             catch (JMSException e)
>             {
>                 e.printStackTrace();
>             }
>         }
>     }
>
>
> As i worked on this today the problem is not that i dont receive anything at
> all, the problem is that i receive all the rest only after it redelivers the
> first failed message:
>     -  Got M1 to send
>      - Deliver M1
>     - Session rolled back so retry
>       ... time passed
>      -Got M2 to send but it is not actually sent, or maybe the consumer just
> doesnt want to get it
>       - Retry delivering, again rollback
>       ... time passed
>       - Retry.....
>       - After retried max times M2 is finally received by consumer.
>
> What i want is M2 to be delivered and received in the mean-time the M1 is
> retried to be delivered.
>
> Thanks for reply !.
>
>
>
> ----- Original Message -----
> From: "James Strachan" <james.strachan@gmail.com>
> To: <activemq-users@geronimo.apache.org>
> Sent: Wednesday, July 19, 2006 15:21
> Subject: Re: Consumer does not receive anything after session.rollback()
>
>
> > Could you demonstrate your issue in a JUnit test case so we can see
> > how you are using the JMS API please?
> >
> > On 7/19/06, Adrian <neagu@mail.ru> wrote:
> >>
> >> I have a transacted session that in some cases does a rollback() thus
> >> telling
> >> the server to redeliver the message later, after such a rollback()
> >> anything
> >> sent is not received by the consumer, although the new messages are added
> >> to
> >> the database.
> >> The next time i start the consumer it receives just the message it did
> >> rollback on (for every message a rollback or a commit is done), nothing
> >> else.
> >> The docs say that anything that happens after commit or rollback is in a
> >> new
> >> transaction, why dont i receive the messages sent after rollback ?
> >>
> >> Thanks.
> >> --
> >> View this message in context:
> >> http://www.nabble.com/Consumer-does-not-receive-anything-after-session.rollback%28%29-tf1965658.html#a5394170
> >> Sent from the ActiveMQ - User forum at Nabble.com.
> >>
> >>
> >
> >
> > --
> >
> > James
> > -------
> > http://radio.weblogs.com/0112098/
> >
>
>


-- 

James
-------
http://radio.weblogs.com/0112098/

Mime
View raw message