activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rob Davies <rajdav...@gmail.com>
Subject Re: ActiveMQ message delivery
Date Wed, 09 Jul 2008 07:00:05 GMT
Hi Marjan,

could you try with the latest 5.2-snapshot - I think this has been  
resolved recently.
Let me know how you get on!

cheers,

Rob

On 9 Jul 2008, at 07:26, Marjan wrote:

>
> I'm testing ActiveMQ 5.1 and I found a strange behaviour. There is a  
> single
> message producer firing 10 messages in 1 second and message consumer  
> that
> listens on the same destination. Here is the code:
>
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.DeliveryMode;
> import javax.jms.Destination;
> import javax.jms.MessageProducer;
> import javax.jms.ObjectMessage;
> import javax.jms.Session;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
>
> /**
> * @author Marjan Sterjev
> *
> */
> public class JmsProducer {
>
>        public static void main(String[] args) throws Throwable {
>                ConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(
>                                "tcp://localhost:61616");
>                Connection connection =
> connectionFactory.createConnection();
>                Session session = connection.createSession(false,
>                                Session.AUTO_ACKNOWLEDGE);
>                Destination destination =  
> session.createQueue("test.queue");
>                MessageProducer producer =
> session.createProducer(destination);
>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>                producer.setTimeToLive(10000);
>                int msgCount = 10;
>                int sleep = 100;
>                String payload = "Test Message";
>                for (int i = 0; i < msgCount; i++) {
>                        ObjectMessage message =
> session.createObjectMessage();
>                        String p = String.format("%s:%d", payload, (i  
> + 1));
>                        message.setObject(p);
>                        producer.send(message);
>                        Thread.sleep(sleep);
>                }
>                producer.close();
>                session.close();
>                connection.close();
>        }
> }
>
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.ObjectMessage;
> import javax.jms.Session;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
>
> /**
> * @author Marjan Sterjev
> *
> */
> public class JmsConsumer implements MessageListener {
>
>        private int messagesReceived;
>
>        public JmsConsumer() throws Throwable {
>                ConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(
>                                "tcp://localhost:61616");
>                Connection connection =
> connectionFactory.createConnection();
>                connection.start();
>                Session session = connection.createSession(false,
>                                Session.AUTO_ACKNOWLEDGE);
>                Destination destination =  
> session.createQueue("test.queue");
>                MessageConsumer consumer =
> session.createConsumer(destination);
>                consumer.setMessageListener(this);
>        }
>
>        public void onMessage(Message message) {
>                ObjectMessage objectMessage = (ObjectMessage) message;
>                try {
>                        System.out.println(String.format("[%d]:%s",
> (++messagesReceived),
>
> objectMessage.getObject().toString()));
>                } catch (JMSException e) {
>                        e.printStackTrace();
>                }
>        }
>
>        public static void main(String[] args)throws Throwable{
>                new JmsConsumer();
>        }
>
> }
>
> I'm using the default persistence adapter:
>
>
> <persistenceAdapter>
>            <amqPersistenceAdapter syncOnWrite="false"
> directory="${activemq.base}/data" maxFileLength="20 mb"/>
> </persistenceAdapter>
>
> Note that message expiration time is set to 10 seconds.
>
> If the message consumer is up all the time there is no problem, all  
> messages
> are received promtly. I see a problem if message consumer is down  
> and some
> of the messages expire. In that case the message store behaves  
> strangely.
> When the consumer is started again and we run the producer multiple  
> times
> there are cases when the last message is not delivered to the  
> consumer. I
> found that the message will be delivered if additional messages are  
> sent to
> the queue or message consumer is restarted. This behaviour occurs  
> randomly.
>
> In order to reproduce the scenario follow these steps:
>
> 1. Start the consumer
> 2. Run producer multiple times. All messages will be delivered as  
> expected.
> 3. Stop the consumer
> 4. Run producer and wait more than 10 seconds after that. After 10  
> seconds
> the messages will expire.
> 5. Start the consumer
> 6. Run producer multiple times (do not give up if everything is ok  
> after
> several runnings). You should note that the last message (10-th) is  
> not
> delivered on each producer burst.
>
> I was monitoring the WEB administration console. With such corrupted  
> store I
> see sometimes negative numbers as number of pending messages which  
> is a
> clear bug in the application.
>
> If I use syncOnWrite="true" on amqPersistenceAdapter, the problem  
> will go
> away. The problem does not exist with jdbcPersistenceAdapter.
>
> My operating system is Windows XP.
>
> Any comments?
>
> Regards
>
> -- 
> View this message in context: http://www.nabble.com/ActiveMQ-message-delivery-tp18355245p18355245.html
> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>


Mime
View raw message