activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Adrian Neaga" <ane...@tacitknowledge.com>
Subject Re: Consumer does not receive anything after session.rollback()
Date Wed, 19 Jul 2006 12:36:04 GMT
My Spring configuration is as follows:

 <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
        <property name="config" 
value="classpath:com/tacitknowledge/spring/mail/activemq.xml" />
        <property name="start" value="true" />
    </bean>

    <bean id="jmsFactory" 
class="org.apache.activemq.ActiveMQConnectionFactory" singleton="true">
        <property name="brokerURL" value="vm://localhost"/>
        <property name="redeliveryPolicy">
            <bean class="org.apache.activemq.RedeliveryPolicy">
                <property name="initialRedeliveryDelay"
                          value="10000"/>
                <property name="maximumRedeliveries"
                          value="3"/>
                <property name="backOffMultiplier"
                          value="1"/>
                <property name="useExponentialBackOff"
                          value="true"/>
            </bean>
        </property>
        <property name="useAsyncSend" value="true"/>
    </bean>

    <!--<bean id="jmsTransactionManager" 
class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory"><ref 
local="jmsFactory"/></property>
     </bean>-->

    <!--  Spring JMS Template-->
    <bean id="mqJmsTemplate" 
class="org.springframework.jms.core.JmsTemplate" singleton="true">
        <property name="connectionFactory">
            <!--  wrap in a pool to avoid creating a connection per send-->
            <bean 
class="org.springframework.jms.connection.SingleConnectionFactory" 
abstract="false" singleton="true"
                  lazy-init="default" autowire="default" 
dependency-check="default">
                <property name="targetConnectionFactory">
                    <ref local="jmsFactory"/>
                </property>
            </bean>
        </property>
    </bean>

    <bean id="jmsMailSender" 
class="com.tacitknowledge.spring.mail.JMSMailSenderImpl" singleton="true">
        <property name="template">
            <ref bean="mqJmsTemplate"/>
        </property>
        <property name="destination">
            <ref bean="destination"/>
        </property>
        <property name="messageTimeToLive" value="86400000"/> <!--One day to 
live-->
    </bean>

    <bean id="jmsMailConsumer" 
class="com.tacitknowledge.spring.mail.JMSMailConsumer" singleton="true"
          init-method="start" destroy-method="stop">
        <property name="template"><ref bean="mqJmsTemplate"/></property>
        <property name="destination"><ref bean="destination"/></property>
        <property name="mailSender"><ref bean="mailSender"/></property>
    </bean>

    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue" 
autowire="constructor" abstract="false" singleton="true">
        <constructor-arg>
            <value>com.channels.mailTopic</value>
        </constructor-arg>
    </bean>

---------------------------------------------- ACTIVEMQ 
config: ----------------------

<beans>
    <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>

    <broker useJmx="true" xmlns="http://activemq.org/config/1.0">

        <managementContext>
            <managementContext connectorPort="1099" 
jmxDomainName="org.apache.activemq"/>
        </managementContext>

        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry topic="com.channels.mailTopic">
                        <dispatchPolicy>
                            <simpleDispatchPolicy/>
                        </dispatchPolicy>
                        <subscriptionRecoveryPolicy>
                            <timedSubscriptionRecoveryPolicy recoverDuration 
= "3600000"/>
                        </subscriptionRecoveryPolicy>
                        <!-- lets force old messages to be discarded for 
slow consumers -->
                        <pendingMessageLimitStrategy>
                            <constantPendingMessageLimitStrategy 
limit="1000"/>
                        </pendingMessageLimitStrategy>

                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <persistenceAdapter>
            <jdbcPersistenceAdapter cleanupPeriod="86400000" 
dataSource="#mysql-ds">
            </jdbcPersistenceAdapter>
        </persistenceAdapter>

        <transportConnectors>
            <transportConnector name="default" uri="tcp://localhost:61616"
                                discoveryUri="multicast://default"/>
            <transportConnector name="stomp" uri="stomp://localhost:61613"/>
        </transportConnectors>

        <networkConnectors>
            <networkConnector name="default" uri="multicast://default"/>
        </networkConnectors>
    </broker>

    <bean id="mysql-ds"
          class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName">
            <value>com.mysql.jdbc.Driver</value>
        </property>
        <property name="url">
            <value>jdbc:mysql://localhost:3306/channels_dev</value>
        </property>
        <property name="username">
            <value>user</value>
        </property>
        <property name="password">
            <value>pass</value>
        </property>
    </bean>
</beans>


The jmsMailSender code:

        template.setDeliveryMode(DeliveryMode.PERSISTENT);
        template.setExplicitQosEnabled(true);

        if(messageTimeToLive > -1)
        {
            template.setTimeToLive(messageTimeToLive);
        }

        template.send(destination, new MessageCreator()
        {
            public Message createMessage(Session session) throws 
JMSException
            {
                Message message = 
session.createObjectMessage(simpleMailMessage);
                log.debug("Sending message:" + message);
                return message;
            }
        });



And the consumer:

public void start() throws JMSException
    {
        try
        {
            ConnectionFactory factory = template.getConnectionFactory();
            connection = factory.createConnection();

            // we might be a reusable connection in spring
            // so lets only set the client ID once if its not set
            synchronized (connection)
            {
                if (connection.getClientID() == null)
                {
                    connection.setClientID(clientId);
                }
            }

            connection.start();

            session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(this);
            log.debug("JMSMailConsumer started");
        }
        catch (JMSException ex)
        {
            log.error(ex);
            throw ex;
        }
    }


public void onMessage(Message message)
    {
        if (message instanceof ObjectMessage)
        {
            ObjectMessage objectMessage = (ObjectMessage) message;
            try
            {
                SimpleMailMessage mailMessage = (SimpleMailMessage) 
objectMessage.getObject();
                log.debug("Received JMS mail message " + mailMessage);

                try
                {
                    mailSender.send(mailMessage);
                    mailMessageList.add(message);
                    session.commit();
                    log.debug("Sent the received JMS mail message to " + 
mailMessage.getTo());
                }
                catch (MailException mailEx)
                {
                    log.error(mailEx);
                    session.rollback();
                }
            }
            catch (JMSException e)
            {
                e.printStackTrace();
            }
        }
    }


As i worked on this today the problem is not that i dont receive anything at 
all, the problem is that i receive all the rest only after it redelivers the 
first failed message:
    -  Got M1 to send
     - Deliver M1
    - Session rolled back so retry
      ... time passed
     -Got M2 to send but it is not actually sent, or maybe the consumer just 
doesnt want to get it
      - Retry delivering, again rollback
      ... time passed
      - Retry.....
      - After retried max times M2 is finally received by consumer.

What i want is M2 to be delivered and received in the mean-time the M1 is 
retried to be delivered.

Thanks for reply !.



----- Original Message ----- 
From: "James Strachan" <james.strachan@gmail.com>
To: <activemq-users@geronimo.apache.org>
Sent: Wednesday, July 19, 2006 15:21
Subject: Re: Consumer does not receive anything after session.rollback()


> Could you demonstrate your issue in a JUnit test case so we can see
> how you are using the JMS API please?
>
> On 7/19/06, Adrian <neagu@mail.ru> wrote:
>>
>> I have a transacted session that in some cases does a rollback() thus 
>> telling
>> the server to redeliver the message later, after such a rollback() 
>> anything
>> sent is not received by the consumer, although the new messages are added 
>> to
>> the database.
>> The next time i start the consumer it receives just the message it did
>> rollback on (for every message a rollback or a commit is done), nothing
>> else.
>> The docs say that anything that happens after commit or rollback is in a 
>> new
>> transaction, why dont i receive the messages sent after rollback ?
>>
>> Thanks.
>> --
>> View this message in context: 
>> http://www.nabble.com/Consumer-does-not-receive-anything-after-session.rollback%28%29-tf1965658.html#a5394170
>> Sent from the ActiveMQ - User forum at Nabble.com.
>>
>>
>
>
> -- 
>
> James
> -------
> http://radio.weblogs.com/0112098/
> 


Mime
View raw message