activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Martin Grotzke <martin.grot...@javakaffee.de>
Subject Already acknowledged messages are sent again at server restart (duplicate messages)
Date Thu, 13 Dec 2007 21:04:59 GMT
Hi,

currently we have the issue of messages, that are sent again at server
restart on our production system. Unfortunately, on the test-system this
is not reproducable.

I have seen, that in this mailing list and in the issue tracker this
issue has been already discussed several times, but I couldn't find
anything that might help to track down this issue.

So what are we doing? We are using ActiveMQ 4.1.1 with the jvm transport
embedded within a tomcat and configured via spring.

Mhh, more details you find below: configuration, producer, consumer,
logs. So is anything wrong with this? Is it ok to use one session for
two message listeners (to the same backend)? Should we use transactions?
Should we use AativeMQ 5?
Or what else might be the reason for the problem?

Thanx a lot in advance,
cheers,
Martin


And now the details...

The broker (spring) configuration looks like this:

<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
    <property name="config" value="classpath:activemq.xml" />
    <property name="start" value="true" />
</bean>

activemq.xml:
<beans>
  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
  <broker xmlns="http://activemq.org/config/1.0"
          brokerName="localhost" dataDirectory="/tmp/activemq/data">
    <managementContext>
       <managementContext jmxDomainName="foo.message" />
    </managementContext>
  </broker>
</beans>


The jmsTemplate and connectionFactory configuration is the following:

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" scope="singleton">
    <constructor-arg><ref bean="poolingJmsFactory" /></constructor-arg>
    <property name="deliveryPersistent" value="true" />
    <property name="pubSubDomain" value="true"/>
    <property name="defaultDestinationName" value="default"/>
</bean>

<bean id="poolingJmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"
    scope="singleton" depends-on="broker">
    <property name="connectionFactory">
        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL"><value>vm://localhost</value></property>
            <property name="useAsyncSend"><value>false</value></property>
            <property name="optimizeAcknowledge"><value>false</value></property>
            <property name="objectMessageSerializationDefered" value="true" />
        </bean>
    </property>
</bean>


The producer code looks like this, using the jmsTemplate shown above:

    public void sendSomeMessage ( final SomeSerializable someSerializable ) {
        _jmsTemplate.send( Topics.TOPIC_1, new SomeMessageCreator() {
            public Message createMessage( Session session ) throws JMSException {
            final ObjectMessage result = session.createObjectMessage( someSerializable );
            // here is used a property that is later used for selecting messages,
            // perhaps this isn't really required...?
            result.setStringProperty( MessageProperties.EVENT_TYPE.name(), Topics.TOPIC_1.name()
);
            return result;
        });
    }


On the consumer side we have a "MessageListenerContainer", that sets up
several message listeners. Its configuration and the configuration of
the connection factory is this:

<bean id="messageListenerContainer"
        class="com.freiheit.shopping24.shop.message.business.impl.MessageListenerContainer"
        destroy-method="shutdown" scope="singleton">
    <constructor-arg ref="listenerConnectionFactory" />
    <constructor-arg>
        <bean class="foo.Action1" />
        <bean class="foo.Action2" />
        <bean class="foo.Action3" />
        <bean class="foo.Action4" />
    </constructor-arg>
</bean>

<bean id="listenerConnectionFactory"
        class="org.apache.activemq.spring.ActiveMQConnectionFactory"
        scope="singleton" depends-on="broker">
    <property name="brokerURL"><value>vm://localhost</value></property>
    <property name="alwaysSessionAsync"><value>true</value></property>
    <property name="dispatchAsync"><value>false</value></property>
    <property name="optimizeAcknowledge"><value>false</value></property>
    <property name="optimizedMessageDispatch"><value>true</value></property>
</bean>

The MessageListenerContainer looks like the following:

class MessageListenerContainer {

    private Connection _connection;

    MessageListenerContainer (
            final ConnectionFactory listenerConnectionFactory,
            final Action1 action1,
            final Action2 action2,
            final Action3 action3,
            final Action4 action4 ) {

        try {
            _connection = listenerConnectionFactory.createConnection();
            _connection.setClientID( "some-client-id" );

            // two sessions for two different backends
            final Session session1 = _connection.createSession( false, Session.CLIENT_ACKNOWLEDGE
);
            final Session session2 = _connection.createSession( false, Session.CLIENT_ACKNOWLEDGE
);

            // send event1 to backend1 and backend2
            registerEventAndAction( session1, Topics.TOPIC_1, action1.getClass().getSimpleName(),
                    new SomeMessageListener( action1 ) );
            registerEventAndAction( session2, Topics.TOPIC_1, action2.getClass().getSimpleName(),
                    new SomeMessageListener( action2 ) );

            // send event2 to backend1 and backend2
            registerEventAndAction( session1, Topics.TOPIC_2, action3.getClass().getSimpleName(),
                    new SomeOtherMessageListener( action1 ) );
            registerEventAndAction( session2, Topics.TOPIC_2, action4.getClass().getSimpleName(),
                    new SomeOtherMessageListener( action2 ) );

            _connection.start();
        } catch ( JMSException e ) {
            LOG.fatal( "Could not start MessageListenerContainer.", e );
            throw new RuntimeException( "Could not start MessageListenerContainer", e );
        }
    }

    private void registerEventAndAction( Session session,
            Topics topic,
            String actionName,
            MessageListener messageListener ) {
        try {
            // here we have the selector on the property EVENT_TYPE with the name of
            // the Topics enum, but I don't know if we really need this - a colleague has
            // written this ;-)
            TopicSubscriber subscriber = session.createDurableSubscriber( topic, topic.name()
+ "-" +
                    actionName, MessageProperties.EVENT_TYPE + "=\'" + topic.name() + "\'",
false );
            subscriber.setMessageListener( messageListener );
        } catch ( JMSException e ) {
            LOG.fatal( "Could not create subscriber.", e );
        }
    }

    public void shutdown () {
        if ( _connection != null ) {
            try {
                _connection.close();
            } catch ( JMSException e ) {
                LOG.error( "Could not stop JMS connection.", e );
            }
        }
    }

}

The message listeners look like this:

public final class SomeMessageListener implements MessageListener {
    
    private static final Log LOG = LogFactory.getLog( SomeMessageListener.class );

    private final SomeAction _action;

    public SomeMessageListener(SomeAction action) {
        _action = action;
    }

    public void onMessage( Message message ) {
        LOG.info( "Got message " + message );
        final SomeSerializable someSerializable;
        try { 
            someSerializable = (SomeSerializable) ((ObjectMessage)message).getObject();
            _action.performProductListAction( details );
            message.acknowledge();
            LOG.info( "Received message acknowledged." );
        } catch ( JMSException e ) {
            LOG.fatal( "Could not acknowledge message.", e );
        }
    }
}


Exactly these message are printed in the logs when the server is
restarted many messages, and not only for one message, but for 1000 or
2000 for the last 6 days (the server was restarted 6 days before).

The logs related to messaging when the server starts:

[INFO ] main org.apache.activemq.broker.BrokerService.getBroker:
ActiveMQ 4.1.1 JMS Message Broker (localhost) is starting

[INFO ] main org.apache.activemq.broker.BrokerService.getBroker:
For help or more information please see: http://incubator.apache.org/activemq/

[INFO ] JMX connector org.apache.activemq.broker.jmx.ManagementContext.run:

JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi

[INFO ] main org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.createAdapter:
Database driver recognized: [apache_derby_embedded_jdbc_driver]

[INFO ] main org.apache.activemq.store.jdbc.DefaultDatabaseLocker.start:
Attempting to acquire the exclusive lock to become the Master broker

[INFO ] main org.apache.activemq.store.jdbc.DefaultDatabaseLocker.start:
Becoming the master on dataSource: org.apache.derby.jdbc.EmbeddedDataSource@2dda6548

[INFO ] main org.apache.activemq.store.journal.JournalPersistenceAdapter.recover:
Journal Recovery Started from: Active Journal: using 2 x 20.0 Megs at: /tmp/activemq/data/journal

[INFO ] main org.apache.activemq.store.journal.JournalPersistenceAdapter.recover:
Journal Recovered: 1 message(s) in transactions recovered.

[INFO ] main org.apache.activemq.broker.BrokerService.start:
ActiveMQ JMS Message Broker (localhost, ID:portal02.shopping25.easynet.de-55403-1197562278985-1:0)
started

[INFO ] main com.freiheit.shopping24.shop.message.business.impl.MessageServiceImpl.<init>:
Creating MessageService...

[INFO ] main com.freiheit.shopping24.shop.message.business.impl.MessageListenerContainer.<init>:
Creating MessageListenerContainer, registering message listeners...

[INFO ] main org.apache.activemq.broker.TransportConnector.start:
Connector vm://localhost Started

[INFO ] main org.apache.activemq.kaha.impl.KahaStore.delete:
Kaha Store deleted data directory /www/freiheit/data/activemq/data/tmp_storage

[INFO ] ActiveMQ Session Task foo.SomeMessageListener.onMessage:
Got message ActiveMQObjectMessage {commandId = 7, responseRequired = true,
messageId = ID:fqdn-42897-1196946608950-3:795:1:1:3, originalDestination = null,
originalTransactionId = null, producerId = ID:fqdn-42897-1196946608950-3:795:1:1,
destination = topic://TOPIC_1, transactionId = null, expiration = 0, timestamp = 1197048729937,
arrival = 0, correlationId = null, replyTo = null, persistent = true, type = null,
priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false,
userID = null, content = org.apache.activemq.util.ByteSequence@6d6564ae,
marshalledProperties = org.apache.activemq.util.ByteSequence@1bb5139e, dataStr
ucture = null, redeliveryCounter = 0, size = 1208, properties = {EVENT_TYPE=TOPIC_1},
readOnlyProperties = true, readOnlyBody = true, droppable = false}

[INFO ] ActiveMQ Session Task foo.SomeMessageListener.onMessage:
Received message acknowledged.

And lots more log messages like the last two...

Wow, you really have reached this point, thanx a lot! :)
So is there anything wrong?



Mime
View raw message