activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bryan Shaw (JIRA)" <j...@apache.org>
Subject [jira] Issue Comment Edited: (AMQ-1845) Message loss in network of brokers when network connection break
Date Mon, 21 Jul 2008 07:29:00 GMT

    [ https://issues.apache.org/activemq/browse/AMQ-1845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=44408#action_44408
] 

silverhoof edited comment on AMQ-1845 at 7/21/08 12:27 AM:
-----------------------------------------------------------

This test can reproduce the message loss using ActiveMQ 5.0 and 5.1.

The producer side have its own broker and have a fixed IP address.
On the producer side, the spring configuration is like follows:
{code:xml}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:amq="http://activemq.apache.org/schema/core"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
	http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.0.xsd
	http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

	<amq:broker brokerName="producer" useJmx="true" persistent="true">
		<amq:transportConnectors>
			<amq:transportConnector uri="tcp://localhost:61616" />
		</amq:transportConnectors>
	</amq:broker>

	<!-- Jms ConnectionFactory -->
	<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="vm://localhost:61616" />
	</bean>


	<!-- Spring JMS SimpleConverter -->
	<bean id="simpleConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"
/>

	<!-- JMS Queue Template -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsFactory" />
		<property name="messageConverter" ref="converter" />
	</bean>

	<!-- Message Converter -->
	<bean id="converter" class="com.al.SimpleConverter">
		<property name="converter">
			<ref local="simpleConverter" />
		</property>
	</bean>

	<!-- Message porducer -->
	<bean id="sender" class="com.al.DefaultSender">
		<property name="template" ref="jmsTemplate" />
		<property name="destinationName" value="test-out" />
	</bean>

</beans>
{code}

The test app at producer side:
{code:title=TestApp.java|borderStyle=solid}
package com.al;

import org.apache.log4j.Logger;

public class TestApp {
	/**
	 * Logger for this class
	 */
	private static final Logger logger = Logger.getLogger(TestApp.class);

	public static void main(String[] args) {

		logger.debug("start test...");
		//Initializing spring context
		Context.init();
		// uncomment to send messages

		DefaultSender sender = Context.getBean("sender");
		int idx = 1;
		int count = 3000;
		while (idx <= count) {
			sender.sendMessage(SimpleMessageHelper.genSimpleMessage(idx));
			logger.debug("send out message : payload is " + idx);
			idx++;
		}

		/* Infinitely hold main thread  to keep the spring context running
		*/
		Object lock = new Object();
		synchronized (lock) {
			try {
				lock.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

	}
}
{code}

The test client is very simple which send 3000 messages to the queue "test-out" on the producer
side broker.

============================================================================================

The consumer side using dynamic IP address and deployed on another machine.
We have a broker on one separate machine using a duplex = "true" network connector
Consumer side Spring configuration file is as follows:
{code:xml}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:amq="http://activemq.apache.org/schema/core"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
	http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.0.xsd
	http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

	<amq:broker brokerName="brokerA" useJmx="true" persistent="true">

		<amq:networkConnectors>
			<amq:networkConnector uri="static://(tcp://【ip-for-producer-machine】:61616)" duplex="true"/>
		</amq:networkConnectors>

		<amq:transportConnectors>
			<amq:transportConnector uri="tcp://localhost:61616" />
		</amq:transportConnectors>

	</amq:broker>

	<!-- Jms ConnectionFactory -->

	<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="vm://localhost:61616" />
	</bean>

	<!-- Spring JMS SimpleConverter -->
	<bean id="simpleConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"
/>


	<!-- Message Converter -->
	<bean id="converter" class="com.al.SimpleConverter">
		<property name="converter">
			<ref local="simpleConverter" />
		</property>
	</bean>

	<!-- MDP -->
	<!-- consumer 1 -->
	<bean id="listener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg>
			<bean class="com.al.DefaultListener" />
		</constructor-arg>
		<property name="defaultListenerMethod" value="onMessage" />
		<property name="messageConverter" ref="converter" />
	</bean>

	<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsFactory" />
		<property name="destinationName" value="test-out" />
		<property name="messageListener" ref="listener" />
		<property name="sessionTransacted" value="true" />
	</bean>
</beans>
{code}

Default Listener code:
{code:title=DefaultListener.java|borderStyle=solid}
package com.al;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DefaultListener {

	private static Log logger = LogFactory.getLog(DefaultListener.class);

	static int i = 1;

	public void onMessage(SimpleMessage message) {
		int j = Integer.valueOf(message.getPayLoad());
		logger.debug("receive : j = " + j);
		if (i != j) {
			logger.debug("warn : i=" + i + ",j=" + j);
		}
		i++;
	}
}
{code}

The default listener print out the message and compare the index in the message with the number
of message received (It should match if no message loss occurs).


===========================================================================================
And this is how to reproduce message loss in this environment.
1. Start producer application and consumer application
2. Producer application starts to send 3000 messages to the queue "test-out"
3. Consumer application starts to receive messages from the local distributed queue.
4. During the message sending process, disconnect the network cable on consumer machine.
5. You will find that the producer machine still sending messages to the queue "test-out",
which is normal because these messages should be persistent in producer broker's "test-out"
queue. And the mean time the consumer side stopped getting messages from the "test-out" queue,
which is also normal.
6. When you connect the network cable again on the consumer machine(during the message sending
or after the producer node finished sending message) , the consumer broker receives message
from the "test-out" queue again. But logger displays the message count miss match with the
message index in the message payload, which indicates message loss happened.


And by the way, this also reproduced when we trying to avoid duplex connection by define network
connector on both side. This is archived by fixed IP node connecting to a dynamic IP node
using a domain name (simulated by using hosts file to provide routing information)



      was (Author: silverhoof):
    This test can reproduce the message loss using ActiveMQ 5.0 and 5.1.

The producer side have its own broker and have a fixed IP address.
On the producer side, the spring configuration is like follows:
{quote}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:amq="http://activemq.apache.org/schema/core"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
	http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.0.xsd
	http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

	<amq:broker brokerName="producer" useJmx="true" persistent="true">
		<amq:transportConnectors>
			<amq:transportConnector uri="tcp://localhost:61616" />
		</amq:transportConnectors>
	</amq:broker>

	<!-- Jms ConnectionFactory -->
	<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="vm://localhost:61616" />
	</bean>


	<!-- Spring JMS SimpleConverter -->
	<bean id="simpleConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"
/>

	<!-- JMS Queue Template -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsFactory" />
		<property name="messageConverter" ref="converter" />
	</bean>

	<!-- Message Converter -->
	<bean id="converter" class="com.al.SimpleConverter">
		<property name="converter">
			<ref local="simpleConverter" />
		</property>
	</bean>

	<!-- Message porducer -->
	<bean id="sender" class="com.al.DefaultSender">
		<property name="template" ref="jmsTemplate" />
		<property name="destinationName" value="test-out" />
	</bean>

</beans>
{quote}

The test app at producer side:
{quote}
package com.al;

import org.apache.log4j.Logger;

public class TestApp {
	/**
	 * Logger for this class
	 */
	private static final Logger logger = Logger.getLogger(TestApp.class);

	public static void main(String[] args) {

		logger.debug("start test...");
		//Initializing spring context
		Context.init();
		// uncomment to send messages

		DefaultSender sender = Context.getBean("sender");
		int idx = 1;
		int count = 3000;
		while (idx <= count) {
			sender.sendMessage(SimpleMessageHelper.genSimpleMessage(idx));
			logger.debug("send out message : payload is " + idx);
			idx++;
		}

		/* Infinitely hold main thread  to keep the spring context running
		*/
		Object lock = new Object();
		synchronized (lock) {
			try {
				lock.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

	}
}
{quote}

The test client is very simple which send 3000 messages to the queue "test-out" on the producer
side broker.

============================================================================================

The consumer side using dynamic IP address and deployed on another machine.
We have a broker on one separate machine using a duplex = "true" network connector
Consumer side Spring configuration file is as follows:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:amq="http://activemq.apache.org/schema/core"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
	http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.0.xsd
	http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

	<amq:broker brokerName="brokerA" useJmx="true" persistent="true">

		<amq:networkConnectors>
			<amq:networkConnector uri="static://(tcp://【ip-for-producer-machine】:61616)" duplex="true"/>
		</amq:networkConnectors>

		<amq:transportConnectors>
			<amq:transportConnector uri="tcp://localhost:61616" />
		</amq:transportConnectors>

	</amq:broker>

	<!-- Jms ConnectionFactory -->

	<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="vm://localhost:61616" />
	</bean>

	<!-- Spring JMS SimpleConverter -->
	<bean id="simpleConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"
/>


	<!-- Message Converter -->
	<bean id="converter" class="com.al.SimpleConverter">
		<property name="converter">
			<ref local="simpleConverter" />
		</property>
	</bean>

	<!-- MDP -->
	<!-- consumer 1 -->
	<bean id="listener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg>
			<bean class="com.al.DefaultListener" />
		</constructor-arg>
		<property name="defaultListenerMethod" value="onMessage" />
		<property name="messageConverter" ref="converter" />
	</bean>

	<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsFactory" />
		<property name="destinationName" value="test-out" />
		<property name="messageListener" ref="listener" />
		<property name="sessionTransacted" value="true" />
	</bean>
</beans>

Default Listener code:
package com.al;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DefaultListener {

	private static Log logger = LogFactory.getLog(DefaultListener.class);

	static int i = 1;

	public void onMessage(SimpleMessage message) {
		int j = Integer.valueOf(message.getPayLoad());
		logger.debug("receive : j = " + j);
		if (i != j) {
			logger.debug("warn : i=" + i + ",j=" + j);
		}
		i++;
	}
}

The default listener print out the message and compare the index in the message with the number
of message received (It should match if no message loss occurs).


===========================================================================================
And this is how to reproduce message loss in this environment.
1. Start producer application and consumer application
2. Producer application starts to send 3000 messages to the queue "test-out"
3. Consumer application starts to receive messages from the local distributed queue.
4. During the message sending process, disconnect the network cable on consumer machine.
5. You will find that the producer machine still sending messages to the queue "test-out",
which is normal because these messages should be persistent in producer broker's "test-out"
queue. And the mean time the consumer side stopped getting messages from the "test-out" queue,
which is also normal.
6. When you connect the network cable again on the consumer machine(during the message sending
or after the producer node finished sending message) , the consumer broker receives message
from the "test-out" queue again. But logger displays the message count miss match with the
message index in the message payload, which indicates message loss happened.


And by the way, this also reproduced when we trying to avoid duplex connection by define network
connector on both side. This is archived by fixed IP node connecting to a dynamic IP node
using a domain name (simulated by using hosts file to provide routing information)


  
> Message loss in network of brokers when network connection break
> ----------------------------------------------------------------
>
>                 Key: AMQ-1845
>                 URL: https://issues.apache.org/activemq/browse/AMQ-1845
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.1.0
>         Environment: Two brokers connected via TCP with one persistent distributed queue
and a producer and a consumer on each broker.
>            Reporter: Bryan Shaw
>
> Producer on broker A send 2500 message on the distributed queue at broker A.
> Producer B starts to receive message from distributed queue on broker B.
> During the receiving process, the network between these two brokers down and later brought
up again.
> In this senario, we found that some messages are lost. 
> It seems the broker A are sending message to broker B when the network is down and these
messages are removed from queue in broker A but never received by broker B which causing message
loss.
> Is this a bug or a configuration problem? 
> I thought the configuration like this is the store/forward pattern which should ensure
the message reliability in an unstable network.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message