activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hiram Chirino" <hi...@hiramchirino.com>
Subject Re: Already acknowledged messages are sent again at server restart (duplicate messages)
Date Sat, 15 Dec 2007 18:42:50 GMT
Nope.. The broker name also has to be different.  Have them connect
with something like vm://brokera and vm://brokerb

On Dec 15, 2007 8:16 AM, Martin Grotzke <martin.grotzke@javakaffee.de> wrote:
> Hello,
>
> because I just read this in another thread:
>
> > Also make sure the all the brokers have unique broker names.
>
> I forgot to mention, that we have two web applications (contexts)
> running in one tomcat, and both with a broker. ConnectionFactories in
> both apps connect to them both with brokerUrl "vm://localhost".
>
> Via jmx I can see, that two brokers exist, with different BrokerIds.
>
> Is this ok so far, or might it be a possible reason for the strange
> behavior?
>
> Cheers,
> Martin
>
>
>
> On Thu, 2007-12-13 at 22:04 +0100, Martin Grotzke wrote:
> > Hi,
> >
> > currently we have the issue of messages, that are sent again at server
> > restart on our production system. Unfortunately, on the test-system this
> > is not reproducable.
> >
> > I have seen, that in this mailing list and in the issue tracker this
> > issue has been already discussed several times, but I couldn't find
> > anything that might help to track down this issue.
> >
> > So what are we doing? We are using ActiveMQ 4.1.1 with the jvm transport
> > embedded within a tomcat and configured via spring.
> >
> > Mhh, more details you find below: configuration, producer, consumer,
> > logs. So is anything wrong with this? Is it ok to use one session for
> > two message listeners (to the same backend)? Should we use transactions?
> > Should we use AativeMQ 5?
> > Or what else might be the reason for the problem?
> >
> > Thanx a lot in advance,
> > cheers,
> > Martin
> >
> >
> > And now the details...
> >
> > The broker (spring) configuration looks like this:
> >
> > <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
> >     <property name="config" value="classpath:activemq.xml" />
> >     <property name="start" value="true" />
> > </bean>
> >
> > activemq.xml:
> > <beans>
> >   <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
> >   <broker xmlns="http://activemq.org/config/1.0"
> >           brokerName="localhost" dataDirectory="/tmp/activemq/data">
> >     <managementContext>
> >        <managementContext jmxDomainName="foo.message" />
> >     </managementContext>
> >   </broker>
> > </beans>
> >
> >
> > The jmsTemplate and connectionFactory configuration is the following:
> >
> > <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" scope="singleton">
> >     <constructor-arg><ref bean="poolingJmsFactory" /></constructor-arg>
> >     <property name="deliveryPersistent" value="true" />
> >     <property name="pubSubDomain" value="true"/>
> >     <property name="defaultDestinationName" value="default"/>
> > </bean>
> >
> > <bean id="poolingJmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop"
> >     scope="singleton" depends-on="broker">
> >     <property name="connectionFactory">
> >         <bean class="org.apache.activemq.ActiveMQConnectionFactory">
> >             <property name="brokerURL"><value>vm://localhost</value></property>
> >             <property name="useAsyncSend"><value>false</value></property>
> >             <property name="optimizeAcknowledge"><value>false</value></property>
> >             <property name="objectMessageSerializationDefered" value="true" />
> >         </bean>
> >     </property>
> > </bean>
> >
> >
> > The producer code looks like this, using the jmsTemplate shown above:
> >
> >     public void sendSomeMessage ( final SomeSerializable someSerializable ) {
> >         _jmsTemplate.send( Topics.TOPIC_1, new SomeMessageCreator() {
> >             public Message createMessage( Session session ) throws JMSException
{
> >             final ObjectMessage result = session.createObjectMessage( someSerializable
);
> >             // here is used a property that is later used for selecting messages,
> >             // perhaps this isn't really required...?
> >             result.setStringProperty( MessageProperties.EVENT_TYPE.name(), Topics.TOPIC_1.name()
);
> >             return result;
> >         });
> >     }
> >
> >
> > On the consumer side we have a "MessageListenerContainer", that sets up
> > several message listeners. Its configuration and the configuration of
> > the connection factory is this:
> >
> > <bean id="messageListenerContainer"
> >         class="com.freiheit.shopping24.shop.message.business.impl.MessageListenerContainer"
> >         destroy-method="shutdown" scope="singleton">
> >     <constructor-arg ref="listenerConnectionFactory" />
> >     <constructor-arg>
> >         <bean class="foo.Action1" />
> >         <bean class="foo.Action2" />
> >         <bean class="foo.Action3" />
> >         <bean class="foo.Action4" />
> >     </constructor-arg>
> > </bean>
> >
> > <bean id="listenerConnectionFactory"
> >         class="org.apache.activemq.spring.ActiveMQConnectionFactory"
> >         scope="singleton" depends-on="broker">
> >     <property name="brokerURL"><value>vm://localhost</value></property>
> >     <property name="alwaysSessionAsync"><value>true</value></property>
> >     <property name="dispatchAsync"><value>false</value></property>
> >     <property name="optimizeAcknowledge"><value>false</value></property>
> >     <property name="optimizedMessageDispatch"><value>true</value></property>
> > </bean>
> >
> > The MessageListenerContainer looks like the following:
> >
> > class MessageListenerContainer {
> >
> >     private Connection _connection;
> >
> >     MessageListenerContainer (
> >             final ConnectionFactory listenerConnectionFactory,
> >             final Action1 action1,
> >             final Action2 action2,
> >             final Action3 action3,
> >             final Action4 action4 ) {
> >
> >         try {
> >             _connection = listenerConnectionFactory.createConnection();
> >             _connection.setClientID( "some-client-id" );
> >
> >             // two sessions for two different backends
> >             final Session session1 = _connection.createSession( false, Session.CLIENT_ACKNOWLEDGE
);
> >             final Session session2 = _connection.createSession( false, Session.CLIENT_ACKNOWLEDGE
);
> >
> >             // send event1 to backend1 and backend2
> >             registerEventAndAction( session1, Topics.TOPIC_1, action1.getClass().getSimpleName(),
> >                     new SomeMessageListener( action1 ) );
> >             registerEventAndAction( session2, Topics.TOPIC_1, action2.getClass().getSimpleName(),
> >                     new SomeMessageListener( action2 ) );
> >
> >             // send event2 to backend1 and backend2
> >             registerEventAndAction( session1, Topics.TOPIC_2, action3.getClass().getSimpleName(),
> >                     new SomeOtherMessageListener( action1 ) );
> >             registerEventAndAction( session2, Topics.TOPIC_2, action4.getClass().getSimpleName(),
> >                     new SomeOtherMessageListener( action2 ) );
> >
> >             _connection.start();
> >         } catch ( JMSException e ) {
> >             LOG.fatal( "Could not start MessageListenerContainer.", e );
> >             throw new RuntimeException( "Could not start MessageListenerContainer",
e );
> >         }
> >     }
> >
> >     private void registerEventAndAction( Session session,
> >             Topics topic,
> >             String actionName,
> >             MessageListener messageListener ) {
> >         try {
> >             // here we have the selector on the property EVENT_TYPE with the name
of
> >             // the Topics enum, but I don't know if we really need this - a colleague
has
> >             // written this ;-)
> >             TopicSubscriber subscriber = session.createDurableSubscriber( topic,
topic.name() + "-" +
> >                     actionName, MessageProperties.EVENT_TYPE + "=\'" + topic.name()
+ "\'", false );
> >             subscriber.setMessageListener( messageListener );
> >         } catch ( JMSException e ) {
> >             LOG.fatal( "Could not create subscriber.", e );
> >         }
> >     }
> >
> >     public void shutdown () {
> >         if ( _connection != null ) {
> >             try {
> >                 _connection.close();
> >             } catch ( JMSException e ) {
> >                 LOG.error( "Could not stop JMS connection.", e );
> >             }
> >         }
> >     }
> >
> > }
> >
> > The message listeners look like this:
> >
> > public final class SomeMessageListener implements MessageListener {
> >
> >     private static final Log LOG = LogFactory.getLog( SomeMessageListener.class
);
> >
> >     private final SomeAction _action;
> >
> >     public SomeMessageListener(SomeAction action) {
> >         _action = action;
> >     }
> >
> >     public void onMessage( Message message ) {
> >         LOG.info( "Got message " + message );
> >         final SomeSerializable someSerializable;
> >         try {
> >             someSerializable = (SomeSerializable) ((ObjectMessage)message).getObject();
> >             _action.performProductListAction( details );
> >             message.acknowledge();
> >             LOG.info( "Received message acknowledged." );
> >         } catch ( JMSException e ) {
> >             LOG.fatal( "Could not acknowledge message.", e );
> >         }
> >     }
> > }
> >
> >
> > Exactly these message are printed in the logs when the server is
> > restarted many messages, and not only for one message, but for 1000 or
> > 2000 for the last 6 days (the server was restarted 6 days before).
> >
> > The logs related to messaging when the server starts:
> >
> > [INFO ] main org.apache.activemq.broker.BrokerService.getBroker:
> > ActiveMQ 4.1.1 JMS Message Broker (localhost) is starting
> >
> > [INFO ] main org.apache.activemq.broker.BrokerService.getBroker:
> > For help or more information please see: http://incubator.apache.org/activemq/
> >
> > [INFO ] JMX connector org.apache.activemq.broker.jmx.ManagementContext.run:
> >
> > JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
> >
> > [INFO ] main org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.createAdapter:
> > Database driver recognized: [apache_derby_embedded_jdbc_driver]
> >
> > [INFO ] main org.apache.activemq.store.jdbc.DefaultDatabaseLocker.start:
> > Attempting to acquire the exclusive lock to become the Master broker
> >
> > [INFO ] main org.apache.activemq.store.jdbc.DefaultDatabaseLocker.start:
> > Becoming the master on dataSource: org.apache.derby.jdbc.EmbeddedDataSource@2dda6548
> >
> > [INFO ] main org.apache.activemq.store.journal.JournalPersistenceAdapter.recover:
> > Journal Recovery Started from: Active Journal: using 2 x 20.0 Megs at: /tmp/activemq/data/journal
> >
> > [INFO ] main org.apache.activemq.store.journal.JournalPersistenceAdapter.recover:
> > Journal Recovered: 1 message(s) in transactions recovered.
> >
> > [INFO ] main org.apache.activemq.broker.BrokerService.start:
> > ActiveMQ JMS Message Broker (localhost, ID:portal02.shopping25.easynet.de-55403-1197562278985-1:0)
started
> >
> > [INFO ] main com.freiheit.shopping24.shop.message.business.impl.MessageServiceImpl.<init>:
> > Creating MessageService...
> >
> > [INFO ] main com.freiheit.shopping24.shop.message.business.impl.MessageListenerContainer.<init>:
> > Creating MessageListenerContainer, registering message listeners...
> >
> > [INFO ] main org.apache.activemq.broker.TransportConnector.start:
> > Connector vm://localhost Started
> >
> > [INFO ] main org.apache.activemq.kaha.impl.KahaStore.delete:
> > Kaha Store deleted data directory /www/freiheit/data/activemq/data/tmp_storage
> >
> > [INFO ] ActiveMQ Session Task foo.SomeMessageListener.onMessage:
> > Got message ActiveMQObjectMessage {commandId = 7, responseRequired = true,
> > messageId = ID:fqdn-42897-1196946608950-3:795:1:1:3, originalDestination = null,
> > originalTransactionId = null, producerId = ID:fqdn-42897-1196946608950-3:795:1:1,
> > destination = topic://TOPIC_1, transactionId = null, expiration = 0, timestamp =
1197048729937,
> > arrival = 0, correlationId = null, replyTo = null, persistent = true, type = null,
> > priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed
= false,
> > userID = null, content = org.apache.activemq.util.ByteSequence@6d6564ae,
> > marshalledProperties = org.apache.activemq.util.ByteSequence@1bb5139e, dataStr
> > ucture = null, redeliveryCounter = 0, size = 1208, properties = {EVENT_TYPE=TOPIC_1},
> > readOnlyProperties = true, readOnlyBody = true, droppable = false}
> >
> > [INFO ] ActiveMQ Session Task foo.SomeMessageListener.onMessage:
> > Received message acknowledged.
> >
> > And lots more log messages like the last two...
> >
> > Wow, you really have reached this point, thanx a lot! :)
> > So is there anything wrong?
> >
> >
> --
>
> Martin Grotzke
> http://www.javakaffee.de/blog/
>



-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Open Source SOA
http://open.iona.com

Mime
View raw message