activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rob Davies <rajdav...@gmail.com>
Subject Re: Already acknowledged messages are sent again at server restart (duplicate messages)
Date Fri, 14 Dec 2007 07:44:52 GMT
The best thing to do would be to move to version 5 of ActiveMQ

cheers,

Rob
On Dec 13, 2007, at 9:04 PM, Martin Grotzke wrote:

> Hi,
>
> currently we have the issue of messages, that are sent again at server
> restart on our production system. Unfortunately, on the test-system  
> this
> is not reproducable.
>
> I have seen, that in this mailing list and in the issue tracker this
> issue has been already discussed several times, but I couldn't find
> anything that might help to track down this issue.
>
> So what are we doing? We are using ActiveMQ 4.1.1 with the jvm  
> transport
> embedded within a tomcat and configured via spring.
>
> Mhh, more details you find below: configuration, producer, consumer,
> logs. So is anything wrong with this? Is it ok to use one session for
> two message listeners (to the same backend)? Should we use  
> transactions?
> Should we use AativeMQ 5?
> Or what else might be the reason for the problem?
>
> Thanx a lot in advance,
> cheers,
> Martin
>
>
> And now the details...
>
> The broker (spring) configuration looks like this:
>
> <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
>    <property name="config" value="classpath:activemq.xml" />
>    <property name="start" value="true" />
> </bean>
>
> activemq.xml:
> <beans>
>  <bean  
> class 
> = 
> "org 
> .springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
>  <broker xmlns="http://activemq.org/config/1.0"
>          brokerName="localhost" dataDirectory="/tmp/activemq/data">
>    <managementContext>
>       <managementContext jmxDomainName="foo.message" />
>    </managementContext>
>  </broker>
> </beans>
>
>
> The jmsTemplate and connectionFactory configuration is the following:
>
> <bean id="jmsTemplate"  
> class="org.springframework.jms.core.JmsTemplate" scope="singleton">
>    <constructor-arg><ref bean="poolingJmsFactory" /></constructor-arg>
>    <property name="deliveryPersistent" value="true" />
>    <property name="pubSubDomain" value="true"/>
>    <property name="defaultDestinationName" value="default"/>
> </bean>
>
> <bean id="poolingJmsFactory"  
> class="org.apache.activemq.pool.PooledConnectionFactory" destroy- 
> method="stop"
>    scope="singleton" depends-on="broker">
>    <property name="connectionFactory">
>        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
>            <property name="brokerURL"><value>vm://localhost</value></

> property>
>            <property name="useAsyncSend"><value>false</value></

> property>
>            <property name="optimizeAcknowledge"><value>false</ 
> value></property>
>            <property name="objectMessageSerializationDefered"  
> value="true" />
>        </bean>
>    </property>
> </bean>
>
>
> The producer code looks like this, using the jmsTemplate shown above:
>
>    public void sendSomeMessage ( final SomeSerializable  
> someSerializable ) {
>        _jmsTemplate.send( Topics.TOPIC_1, new SomeMessageCreator() {
>            public Message createMessage( Session session ) throws  
> JMSException {
>            final ObjectMessage result =  
> session.createObjectMessage( someSerializable );
>            // here is used a property that is later used for  
> selecting messages,
>            // perhaps this isn't really required...?
>             
> result.setStringProperty( MessageProperties.EVENT_TYPE.name(),  
> Topics.TOPIC_1.name() );
>            return result;
>        });
>    }
>
>
> On the consumer side we have a "MessageListenerContainer", that sets  
> up
> several message listeners. Its configuration and the configuration of
> the connection factory is this:
>
> <bean id="messageListenerContainer"
>         
> class 
> = 
> "com 
> .freiheit 
> .shopping24.shop.message.business.impl.MessageListenerContainer"
>        destroy-method="shutdown" scope="singleton">
>    <constructor-arg ref="listenerConnectionFactory" />
>    <constructor-arg>
>        <bean class="foo.Action1" />
>        <bean class="foo.Action2" />
>        <bean class="foo.Action3" />
>        <bean class="foo.Action4" />
>    </constructor-arg>
> </bean>
>
> <bean id="listenerConnectionFactory"
>        class="org.apache.activemq.spring.ActiveMQConnectionFactory"
>        scope="singleton" depends-on="broker">
>    <property name="brokerURL"><value>vm://localhost</value></property>
>    <property name="alwaysSessionAsync"><value>true</value></property>
>    <property name="dispatchAsync"><value>false</value></property>
>    <property name="optimizeAcknowledge"><value>false</value></ 
> property>
>    <property name="optimizedMessageDispatch"><value>true</value></

> property>
> </bean>
>
> The MessageListenerContainer looks like the following:
>
> class MessageListenerContainer {
>
>    private Connection _connection;
>
>    MessageListenerContainer (
>            final ConnectionFactory listenerConnectionFactory,
>            final Action1 action1,
>            final Action2 action2,
>            final Action3 action3,
>            final Action4 action4 ) {
>
>        try {
>            _connection = listenerConnectionFactory.createConnection();
>            _connection.setClientID( "some-client-id" );
>
>            // two sessions for two different backends
>            final Session session1 =  
> _connection.createSession( false, Session.CLIENT_ACKNOWLEDGE );
>            final Session session2 =  
> _connection.createSession( false, Session.CLIENT_ACKNOWLEDGE );
>
>            // send event1 to backend1 and backend2
>            registerEventAndAction( session1, Topics.TOPIC_1,  
> action1.getClass().getSimpleName(),
>                    new SomeMessageListener( action1 ) );
>            registerEventAndAction( session2, Topics.TOPIC_1,  
> action2.getClass().getSimpleName(),
>                    new SomeMessageListener( action2 ) );
>
>            // send event2 to backend1 and backend2
>            registerEventAndAction( session1, Topics.TOPIC_2,  
> action3.getClass().getSimpleName(),
>                    new SomeOtherMessageListener( action1 ) );
>            registerEventAndAction( session2, Topics.TOPIC_2,  
> action4.getClass().getSimpleName(),
>                    new SomeOtherMessageListener( action2 ) );
>
>            _connection.start();
>        } catch ( JMSException e ) {
>            LOG.fatal( "Could not start MessageListenerContainer.",  
> e );
>            throw new RuntimeException( "Could not start  
> MessageListenerContainer", e );
>        }
>    }
>
>    private void registerEventAndAction( Session session,
>            Topics topic,
>            String actionName,
>            MessageListener messageListener ) {
>        try {
>            // here we have the selector on the property EVENT_TYPE  
> with the name of
>            // the Topics enum, but I don't know if we really need  
> this - a colleague has
>            // written this ;-)
>            TopicSubscriber subscriber =  
> session.createDurableSubscriber( topic, topic.name() + "-" +
>                    actionName, MessageProperties.EVENT_TYPE + "=\'"  
> + topic.name() + "\'", false );
>            subscriber.setMessageListener( messageListener );
>        } catch ( JMSException e ) {
>            LOG.fatal( "Could not create subscriber.", e );
>        }
>    }
>
>    public void shutdown () {
>        if ( _connection != null ) {
>            try {
>                _connection.close();
>            } catch ( JMSException e ) {
>                LOG.error( "Could not stop JMS connection.", e );
>            }
>        }
>    }
>
> }
>
> The message listeners look like this:
>
> public final class SomeMessageListener implements MessageListener {
>
>    private static final Log LOG =  
> LogFactory.getLog( SomeMessageListener.class );
>
>    private final SomeAction _action;
>
>    public SomeMessageListener(SomeAction action) {
>        _action = action;
>    }
>
>    public void onMessage( Message message ) {
>        LOG.info( "Got message " + message );
>        final SomeSerializable someSerializable;
>        try {
>            someSerializable = (SomeSerializable)  
> ((ObjectMessage)message).getObject();
>            _action.performProductListAction( details );
>            message.acknowledge();
>            LOG.info( "Received message acknowledged." );
>        } catch ( JMSException e ) {
>            LOG.fatal( "Could not acknowledge message.", e );
>        }
>    }
> }
>
>
> Exactly these message are printed in the logs when the server is
> restarted many messages, and not only for one message, but for 1000 or
> 2000 for the last 6 days (the server was restarted 6 days before).
>
> The logs related to messaging when the server starts:
>
> [INFO ] main org.apache.activemq.broker.BrokerService.getBroker:
> ActiveMQ 4.1.1 JMS Message Broker (localhost) is starting
>
> [INFO ] main org.apache.activemq.broker.BrokerService.getBroker:
> For help or more information please see: http://incubator.apache.org/activemq/
>
> [INFO ] JMX connector  
> org.apache.activemq.broker.jmx.ManagementContext.run:
>
> JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost: 
> 1099/jmxrmi
>
> [INFO ] main  
> org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.createAdapter:
> Database driver recognized: [apache_derby_embedded_jdbc_driver]
>
> [INFO ] main  
> org.apache.activemq.store.jdbc.DefaultDatabaseLocker.start:
> Attempting to acquire the exclusive lock to become the Master broker
>
> [INFO ] main  
> org.apache.activemq.store.jdbc.DefaultDatabaseLocker.start:
> Becoming the master on dataSource:  
> org.apache.derby.jdbc.EmbeddedDataSource@2dda6548
>
> [INFO ] main  
> org.apache.activemq.store.journal.JournalPersistenceAdapter.recover:
> Journal Recovery Started from: Active Journal: using 2 x 20.0 Megs  
> at: /tmp/activemq/data/journal
>
> [INFO ] main  
> org.apache.activemq.store.journal.JournalPersistenceAdapter.recover:
> Journal Recovered: 1 message(s) in transactions recovered.
>
> [INFO ] main org.apache.activemq.broker.BrokerService.start:
> ActiveMQ JMS Message Broker (localhost,  
> ID:portal02.shopping25.easynet.de-55403-1197562278985-1:0) started
>
> [INFO ] main  
> com 
> .freiheit 
> .shopping24.shop.message.business.impl.MessageServiceImpl.<init>:
> Creating MessageService...
>
> [INFO ] main  
> com 
> .freiheit 
> .shopping24 
> .shop.message.business.impl.MessageListenerContainer.<init>:
> Creating MessageListenerContainer, registering message listeners...
>
> [INFO ] main org.apache.activemq.broker.TransportConnector.start:
> Connector vm://localhost Started
>
> [INFO ] main org.apache.activemq.kaha.impl.KahaStore.delete:
> Kaha Store deleted data directory /www/freiheit/data/activemq/data/ 
> tmp_storage
>
> [INFO ] ActiveMQ Session Task foo.SomeMessageListener.onMessage:
> Got message ActiveMQObjectMessage {commandId = 7, responseRequired =  
> true,
> messageId = ID:fqdn-42897-1196946608950-3:795:1:1:3,  
> originalDestination = null,
> originalTransactionId = null, producerId =  
> ID:fqdn-42897-1196946608950-3:795:1:1,
> destination = topic://TOPIC_1, transactionId = null, expiration = 0,  
> timestamp = 1197048729937,
> arrival = 0, correlationId = null, replyTo = null, persistent =  
> true, type = null,
> priority = 4, groupID = null, groupSequence = 0, targetConsumerId =  
> null, compressed = false,
> userID = null, content =  
> org.apache.activemq.util.ByteSequence@6d6564ae,
> marshalledProperties =  
> org.apache.activemq.util.ByteSequence@1bb5139e, dataStr
> ucture = null, redeliveryCounter = 0, size = 1208, properties =  
> {EVENT_TYPE=TOPIC_1},
> readOnlyProperties = true, readOnlyBody = true, droppable = false}
>
> [INFO ] ActiveMQ Session Task foo.SomeMessageListener.onMessage:
> Received message acknowledged.
>
> And lots more log messages like the last two...
>
> Wow, you really have reached this point, thanx a lot! :)
> So is there anything wrong?
>
>


Mime
View raw message