activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hiram Chirino" <hi...@hiramchirino.com>
Subject Re: Already acknowledged messages are sent again at server restart (duplicate messages)
Date Fri, 14 Dec 2007 18:09:15 GMT
It just the focus of alot of the developers for the last few months
have been on AMQ 5.  It's easier for most of us developers/volunteers
to help out on that version rather than having to load up the old
version.

On Dec 14, 2007 3:32 AM, Martin Grotzke <martin.grotzke@javakaffee.de> wrote:
> Ok, I shouldn't have asked if we should move to AMQ 5 ;)
>
> So there is no problem with our code/configuration/setup?
>
> But thinking about AMQ 5 - why do you recommend this? Is AMQ 4.1.1 in
> general not stable / good enough, or, in other words, is AMQ 5 so much
> better?
>
> And what would we have to change in terms of configuration? What are
> there consequences in terms of migration of data, when a prior AMQ4 is
> updated to AMQ5? I found no AMQ 4 -> 5 migration docs or s.th. like
> this.
>
> Cheers,
> Martin
>
>
>
>
> On Fri, 2007-12-14 at 07:44 +0000, Rob Davies wrote:
> > The best thing to do would be to move to version 5 of ActiveMQ
> >
> > cheers,
> >
> > Rob
> > On Dec 13, 2007, at 9:04 PM, Martin Grotzke wrote:
> >
> > > Hi,
> > >
> > > currently we have the issue of messages, that are sent again at server
> > > restart on our production system. Unfortunately, on the test-system
> > > this
> > > is not reproducable.
> > >
> > > I have seen, that in this mailing list and in the issue tracker this
> > > issue has been already discussed several times, but I couldn't find
> > > anything that might help to track down this issue.
> > >
> > > So what are we doing? We are using ActiveMQ 4.1.1 with the jvm
> > > transport
> > > embedded within a tomcat and configured via spring.
> > >
> > > Mhh, more details you find below: configuration, producer, consumer,
> > > logs. So is anything wrong with this? Is it ok to use one session for
> > > two message listeners (to the same backend)? Should we use
> > > transactions?
> > > Should we use AativeMQ 5?
> > > Or what else might be the reason for the problem?
> > >
> > > Thanx a lot in advance,
> > > cheers,
> > > Martin
> > >
> > >
> > > And now the details...
> > >
> > > The broker (spring) configuration looks like this:
> > >
> > > <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
> > >    <property name="config" value="classpath:activemq.xml" />
> > >    <property name="start" value="true" />
> > > </bean>
> > >
> > > activemq.xml:
> > > <beans>
> > >  <bean
> > > class
> > > =
> > > "org
> > > .springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
> > >  <broker xmlns="http://activemq.org/config/1.0"
> > >          brokerName="localhost" dataDirectory="/tmp/activemq/data">
> > >    <managementContext>
> > >       <managementContext jmxDomainName="foo.message" />
> > >    </managementContext>
> > >  </broker>
> > > </beans>
> > >
> > >
> > > The jmsTemplate and connectionFactory configuration is the following:
> > >
> > > <bean id="jmsTemplate"
> > > class="org.springframework.jms.core.JmsTemplate" scope="singleton">
> > >    <constructor-arg><ref bean="poolingJmsFactory" /></constructor-arg>
> > >    <property name="deliveryPersistent" value="true" />
> > >    <property name="pubSubDomain" value="true"/>
> > >    <property name="defaultDestinationName" value="default"/>
> > > </bean>
> > >
> > > <bean id="poolingJmsFactory"
> > > class="org.apache.activemq.pool.PooledConnectionFactory" destroy-
> > > method="stop"
> > >    scope="singleton" depends-on="broker">
> > >    <property name="connectionFactory">
> > >        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
> > >            <property name="brokerURL"><value>vm://localhost</value></
> > > property>
> > >            <property name="useAsyncSend"><value>false</value></
> > > property>
> > >            <property name="optimizeAcknowledge"><value>false</
> > > value></property>
> > >            <property name="objectMessageSerializationDefered"
> > > value="true" />
> > >        </bean>
> > >    </property>
> > > </bean>
> > >
> > >
> > > The producer code looks like this, using the jmsTemplate shown above:
> > >
> > >    public void sendSomeMessage ( final SomeSerializable
> > > someSerializable ) {
> > >        _jmsTemplate.send( Topics.TOPIC_1, new SomeMessageCreator() {
> > >            public Message createMessage( Session session ) throws
> > > JMSException {
> > >            final ObjectMessage result =
> > > session.createObjectMessage( someSerializable );
> > >            // here is used a property that is later used for
> > > selecting messages,
> > >            // perhaps this isn't really required...?
> > >
> > > result.setStringProperty( MessageProperties.EVENT_TYPE.name(),
> > > Topics.TOPIC_1.name() );
> > >            return result;
> > >        });
> > >    }
> > >
> > >
> > > On the consumer side we have a "MessageListenerContainer", that sets
> > > up
> > > several message listeners. Its configuration and the configuration of
> > > the connection factory is this:
> > >
> > > <bean id="messageListenerContainer"
> > >
> > > class
> > > =
> > > "com
> > > .freiheit
> > > .shopping24.shop.message.business.impl.MessageListenerContainer"
> > >        destroy-method="shutdown" scope="singleton">
> > >    <constructor-arg ref="listenerConnectionFactory" />
> > >    <constructor-arg>
> > >        <bean class="foo.Action1" />
> > >        <bean class="foo.Action2" />
> > >        <bean class="foo.Action3" />
> > >        <bean class="foo.Action4" />
> > >    </constructor-arg>
> > > </bean>
> > >
> > > <bean id="listenerConnectionFactory"
> > >        class="org.apache.activemq.spring.ActiveMQConnectionFactory"
> > >        scope="singleton" depends-on="broker">
> > >    <property name="brokerURL"><value>vm://localhost</value></property>
> > >    <property name="alwaysSessionAsync"><value>true</value></property>
> > >    <property name="dispatchAsync"><value>false</value></property>
> > >    <property name="optimizeAcknowledge"><value>false</value></
> > > property>
> > >    <property name="optimizedMessageDispatch"><value>true</value></
> > > property>
> > > </bean>
> > >
> > > The MessageListenerContainer looks like the following:
> > >
> > > class MessageListenerContainer {
> > >
> > >    private Connection _connection;
> > >
> > >    MessageListenerContainer (
> > >            final ConnectionFactory listenerConnectionFactory,
> > >            final Action1 action1,
> > >            final Action2 action2,
> > >            final Action3 action3,
> > >            final Action4 action4 ) {
> > >
> > >        try {
> > >            _connection = listenerConnectionFactory.createConnection();
> > >            _connection.setClientID( "some-client-id" );
> > >
> > >            // two sessions for two different backends
> > >            final Session session1 =
> > > _connection.createSession( false, Session.CLIENT_ACKNOWLEDGE );
> > >            final Session session2 =
> > > _connection.createSession( false, Session.CLIENT_ACKNOWLEDGE );
> > >
> > >            // send event1 to backend1 and backend2
> > >            registerEventAndAction( session1, Topics.TOPIC_1,
> > > action1.getClass().getSimpleName(),
> > >                    new SomeMessageListener( action1 ) );
> > >            registerEventAndAction( session2, Topics.TOPIC_1,
> > > action2.getClass().getSimpleName(),
> > >                    new SomeMessageListener( action2 ) );
> > >
> > >            // send event2 to backend1 and backend2
> > >            registerEventAndAction( session1, Topics.TOPIC_2,
> > > action3.getClass().getSimpleName(),
> > >                    new SomeOtherMessageListener( action1 ) );
> > >            registerEventAndAction( session2, Topics.TOPIC_2,
> > > action4.getClass().getSimpleName(),
> > >                    new SomeOtherMessageListener( action2 ) );
> > >
> > >            _connection.start();
> > >        } catch ( JMSException e ) {
> > >            LOG.fatal( "Could not start MessageListenerContainer.",
> > > e );
> > >            throw new RuntimeException( "Could not start
> > > MessageListenerContainer", e );
> > >        }
> > >    }
> > >
> > >    private void registerEventAndAction( Session session,
> > >            Topics topic,
> > >            String actionName,
> > >            MessageListener messageListener ) {
> > >        try {
> > >            // here we have the selector on the property EVENT_TYPE
> > > with the name of
> > >            // the Topics enum, but I don't know if we really need
> > > this - a colleague has
> > >            // written this ;-)
> > >            TopicSubscriber subscriber =
> > > session.createDurableSubscriber( topic, topic.name() + "-" +
> > >                    actionName, MessageProperties.EVENT_TYPE + "=\'"
> > > + topic.name() + "\'", false );
> > >            subscriber.setMessageListener( messageListener );
> > >        } catch ( JMSException e ) {
> > >            LOG.fatal( "Could not create subscriber.", e );
> > >        }
> > >    }
> > >
> > >    public void shutdown () {
> > >        if ( _connection != null ) {
> > >            try {
> > >                _connection.close();
> > >            } catch ( JMSException e ) {
> > >                LOG.error( "Could not stop JMS connection.", e );
> > >            }
> > >        }
> > >    }
> > >
> > > }
> > >
> > > The message listeners look like this:
> > >
> > > public final class SomeMessageListener implements MessageListener {
> > >
> > >    private static final Log LOG =
> > > LogFactory.getLog( SomeMessageListener.class );
> > >
> > >    private final SomeAction _action;
> > >
> > >    public SomeMessageListener(SomeAction action) {
> > >        _action = action;
> > >    }
> > >
> > >    public void onMessage( Message message ) {
> > >        LOG.info( "Got message " + message );
> > >        final SomeSerializable someSerializable;
> > >        try {
> > >            someSerializable = (SomeSerializable)
> > > ((ObjectMessage)message).getObject();
> > >            _action.performProductListAction( details );
> > >            message.acknowledge();
> > >            LOG.info( "Received message acknowledged." );
> > >        } catch ( JMSException e ) {
> > >            LOG.fatal( "Could not acknowledge message.", e );
> > >        }
> > >    }
> > > }
> > >
> > >
> > > Exactly these message are printed in the logs when the server is
> > > restarted many messages, and not only for one message, but for 1000 or
> > > 2000 for the last 6 days (the server was restarted 6 days before).
> > >
> > > The logs related to messaging when the server starts:
> > >
> > > [INFO ] main org.apache.activemq.broker.BrokerService.getBroker:
> > > ActiveMQ 4.1.1 JMS Message Broker (localhost) is starting
> > >
> > > [INFO ] main org.apache.activemq.broker.BrokerService.getBroker:
> > > For help or more information please see: http://incubator.apache.org/activemq/
> > >
> > > [INFO ] JMX connector
> > > org.apache.activemq.broker.jmx.ManagementContext.run:
> > >
> > > JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:
> > > 1099/jmxrmi
> > >
> > > [INFO ] main
> > > org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.createAdapter:
> > > Database driver recognized: [apache_derby_embedded_jdbc_driver]
> > >
> > > [INFO ] main
> > > org.apache.activemq.store.jdbc.DefaultDatabaseLocker.start:
> > > Attempting to acquire the exclusive lock to become the Master broker
> > >
> > > [INFO ] main
> > > org.apache.activemq.store.jdbc.DefaultDatabaseLocker.start:
> > > Becoming the master on dataSource:
> > > org.apache.derby.jdbc.EmbeddedDataSource@2dda6548
> > >
> > > [INFO ] main
> > > org.apache.activemq.store.journal.JournalPersistenceAdapter.recover:
> > > Journal Recovery Started from: Active Journal: using 2 x 20.0 Megs
> > > at: /tmp/activemq/data/journal
> > >
> > > [INFO ] main
> > > org.apache.activemq.store.journal.JournalPersistenceAdapter.recover:
> > > Journal Recovered: 1 message(s) in transactions recovered.
> > >
> > > [INFO ] main org.apache.activemq.broker.BrokerService.start:
> > > ActiveMQ JMS Message Broker (localhost,
> > > ID:portal02.shopping25.easynet.de-55403-1197562278985-1:0) started
> > >
> > > [INFO ] main
> > > com
> > > .freiheit
> > > .shopping24.shop.message.business.impl.MessageServiceImpl.<init>:
> > > Creating MessageService...
> > >
> > > [INFO ] main
> > > com
> > > .freiheit
> > > .shopping24
> > > .shop.message.business.impl.MessageListenerContainer.<init>:
> > > Creating MessageListenerContainer, registering message listeners...
> > >
> > > [INFO ] main org.apache.activemq.broker.TransportConnector.start:
> > > Connector vm://localhost Started
> > >
> > > [INFO ] main org.apache.activemq.kaha.impl.KahaStore.delete:
> > > Kaha Store deleted data directory /www/freiheit/data/activemq/data/
> > > tmp_storage
> > >
> > > [INFO ] ActiveMQ Session Task foo.SomeMessageListener.onMessage:
> > > Got message ActiveMQObjectMessage {commandId = 7, responseRequired =
> > > true,
> > > messageId = ID:fqdn-42897-1196946608950-3:795:1:1:3,
> > > originalDestination = null,
> > > originalTransactionId = null, producerId =
> > > ID:fqdn-42897-1196946608950-3:795:1:1,
> > > destination = topic://TOPIC_1, transactionId = null, expiration = 0,
> > > timestamp = 1197048729937,
> > > arrival = 0, correlationId = null, replyTo = null, persistent =
> > > true, type = null,
> > > priority = 4, groupID = null, groupSequence = 0, targetConsumerId =
> > > null, compressed = false,
> > > userID = null, content =
> > > org.apache.activemq.util.ByteSequence@6d6564ae,
> > > marshalledProperties =
> > > org.apache.activemq.util.ByteSequence@1bb5139e, dataStr
> > > ucture = null, redeliveryCounter = 0, size = 1208, properties =
> > > {EVENT_TYPE=TOPIC_1},
> > > readOnlyProperties = true, readOnlyBody = true, droppable = false}
> > >
> > > [INFO ] ActiveMQ Session Task foo.SomeMessageListener.onMessage:
> > > Received message acknowledged.
> > >
> > > And lots more log messages like the last two...
> > >
> > > Wow, you really have reached this point, thanx a lot! :)
> > > So is there anything wrong?
> > >
> > >
> >
> --
> Martin Grotzke
> http://www.javakaffee.de/blog/
>



-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Open Source SOA
http://open.iona.com

Mime
View raw message