activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From spiiff <matthias.ba...@siemens.com>
Subject Re: receiving old messages when restartin embedded broker - problem with persistence/auto_acknowledge?
Date Fri, 26 Oct 2007 09:13:44 GMT

Turning off the batchedStatements did not really solve the problem.
Here is a bug description (from Jul 04, 2006 ! ):
http://www.nabble.com/Is-this-a-bug-in-DefaultJDBCAdapter.java--tf1890445s2354.html#a5168977
I think this was the reason for our problem..

With this correction in the DefaultJDBCAdapter.doSetLastAck() :
PreparedStatement s=c.getUpdateLastAckStatement();
        	//c.getAddMessageStatement();
our code works as expected...

Why was this bug not solved before?
Will it be solved with version 4.2?
Is this bug already tracked somewhere? How can I check this?

regards,
m.


spiiff wrote:
> 
> We solved the problem.
> I debugged the activeMq code and somewhere deep in the
> "DefaultJDBCAdapter" I found a batchedStatements boolean. When I insert
> the following snippet in the activemq.xml 
> the boolean is set to false and all statements are correctly persisted:
> 
>   <bean id="db-adapter"
> class="org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter"> 
>     <property name="batchStatments" value="false" /> 
>   </bean>  
> 
> Of course, in your persistence adapter you have to reference the
> db-adapter bean:
> 
>     <persistenceAdapter>
>       <journaledJDBC journalLogFiles="5" dataDirectory="../activemq-data"
> createTablesOnStartup="true" dataSource="#oracle-ds" adapter="#db-adapter"
> />
>     </persistenceAdapter>     
> 
> 
> I don't exactly know the effect of turning off batched statements, but for
> my testing it is Ok at the moment. Maybe we have lost some performance
> now.. When we really start to use activeMQ in our system I'll have to
> investigate further ..
> 
> 
> 
> krv wrote:
>> 
>> I had encountered the same problem. I had a sender and a receiver which
>> were running on Tomcat server. Whenever I restarted the receiver I used
>> to get all the messages that I had earlier to the message broker and
>> which had been processed.
>> 
>> The only difference my code has than your code was that I am using a
>> MessageListener.onMessage(Message) rather than Consumer.receive().
>> 
>> After some investigation I found out that  my code had caused the
>> problem. When I had called a method in my code it encountered some
>> exception and the control never returned to the MessageListener (the
>> receiver) and hence the ActiveMQ session never got committed. And hence
>> the message was never dequed from the queue.
>> 
>> I am not sure if your problem is the same, but you can try the following:
>>   * Commit the session just after you receive the message
>>   * Acknowledge the message: message.acknowledge()
>> 
>> Let me know how it goes.
>> 
>> 
>> spiiff wrote:
>>> 
>>> Hello,
>>> we are facing a strange problem.
>>> I am running an embedded broker in my unit test:
>>> 
>>> 		URI activemqConfigurationUri = new URI("xbean:conf/activemq.xml");        
>>> 	    	brokerService = 		    		 
>>> 	    		BrokerFactory.createBroker(activemqConfigurationUri);	    	
>>> 	    	brokerService.start();
>>> 
>>> 
>>> I have 2 connections/sessions, from every session I create a customer to
>>> a topic:
>>> 
>>> 		connection1 = connectionFactory.createConnection();
>>> 		connection1.setClientID("1stID");	        
>>> 		connection1.start();	        
>>> 		session1 = connection1.createSession(false,Session.AUTO_ACKNOWLEDGE);	    
 
>>> 		MessageConsumer consumer1 = ((TopicSession)
>>> session1).createDurableSubscriber((Topic) destination, subscriberName);
>>> 
>>>  			
>>> 	             connection2 = connectionFactory.createConnection();
>>> 		connection2.setClientID("2ndID");	        
>>> 		connection2.start();
>>> 		session2 = connection2.createSession(false,Session.AUTO_ACKNOWLEDGE);
>>> 		MessageConsumer consumer2 = ((TopicSession)
>>> session2).createDurableSubscriber(
>>> 		  (Topic) destination, subscriberName);
>>> 
>>> 
>>> From a third session I create a producer and send 2 messages to the
>>> topic:
>>> 
>>> 		producerConnection = connectionFactory.createConnection();
>>> 	             producerConnection.setClientID("producer");
>>> 	             producerConnection.start();	        
>>> 		producerSession =
>>> producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
>>> 
>>> 		producer = producerSession.createProducer(destination);
>>>                          
>>> producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>>>                           producer.setTimeToLive(300000);   
>>> 
>>> 
>>> 		javax.jms.Message msg1 =
>>> producerSession.createObjectMessage(message1);
>>> 		javax.jms.Message msg2 =
>>> producerSession.createObjectMessage(message2);			
>>> 		producer.send(msg1,DeliveryMode.PERSISTENT , Message.DEFAULT_PRIORITY,
>>>                 Message.DEFAULT_TIME_TO_LIVE);
>>> 		producer.send(msg2,DeliveryMode.PERSISTENT , Message.DEFAULT_PRIORITY,
>>> 	              Message.DEFAULT_TIME_TO_LIVE);
>>> 
>>> 
>>> Then I receive the 2 messages with my consumers (every consumer is
>>> receiving 2 messages):
>>> 
>>> 		Message msg1 = consumer1.receive(3000);
>>>     	             while(msg1!=null){
>>> 	    		Object object = ((ObjectMessage)msg1).getObject();	    		
>>> 	    		msg1 = consumer1.receive(3000);
>>> 		}
>>> 
>>> 		Message msg2 = consumer2.receive(3000);
>>>     	             while(msg2!=null){
>>> 	    		Object object = ((ObjectMessage)msg2).getObject();
>>> 	    		msg2 = consumer2.receive(3000);
>>>     	             }
>>> 
>>> 
>>> 
>>> I close the consumers:
>>> 
>>> 		consumer1.close();
>>> 		consumer2.close();
>>> 
>>> And again I send 2 messages.
>>> I close all connections and the embedded broker.
>>> The next time I run the unittest the first consumer gets 4 messages, as
>>> expected.
>>> The second consumer gets 6 messages! It gets all messages that were
>>> produced, all the time.
>>> We're using activemq 4.1.1 
>>> 
>>> When we are not running an embedded broker but an standalone broker, a
>>> separate process, everything 
>>> is Ok as long we don't stop and restart the broker. After restarting the
>>> second consumer again receives all the messages 
>>> the first time it calls consumer.receive(). Then the second time it
>>> receives 4 messages.
>>> I guess there is something wrong with the persistence to the database.
>>> When we're using our oracle I can query ACTIVMQ_ACKS.
>>> There my 2 CLIENT_IDs are listed. And the LAST_ACKED_ID only changes for
>>> one of the CLIENT_IDs.
>>> 
>>> Can anybody help me?
>>> 
>>> I didn't find the same problem in the forum, only alike problems:
>>> http://www.nabble.com/-activemq-user--Re%3A-receiving-old-messages-again-after-restart-tf95738s2354.html#a265691
>>> 
>>> Regards,
>>> Matthias
>>> 
>>> 
>> 
>> 
> 
> 

-- 
View this message in context: http://www.nabble.com/receiving-old-messages-when-restartin-embedded-broker---problem-with-persistence-auto_acknowledge--tf4634882s2354.html#a13423344
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message