activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Giovanni Toffetti (Commented) (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQ-1509) Duplicate topic messages received with network of brokers and selectors
Date Wed, 09 Nov 2011 11:50:51 GMT

    [ https://issues.apache.org/jira/browse/AMQ-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13146959#comment-13146959
] 

Giovanni Toffetti commented on AMQ-1509:
----------------------------------------

Hi Dejan,

just by chance I started looking again at this issue. I don't think the problem is fixed:
as soon as there are more than one ( at least 2 hops ) paths between brokers message duplication
occurs.

Here's a little example:

{code:title=FourBrokerTopicNetworkTest}

public class FourBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport implements MessageListener
{
	protected static final int MESSAGE_COUNT = 5;
	public boolean dynamicOnly;

	public void initCombosForTestABandBCbrokerNetworkWithSelectors() {
		addCombinationValues("dynamicOnly", new Object[] { true, false });
	}

	/**
	 * A simple square topology BrokerA <-> BrokerB BrokerA <-> BrokerC BrokerB
	 * <-> BrokerD BrokerD <-> BrokerC
	 * 
	 */
	public void testSquareConnectedBrokerNetwork2() throws Exception {
		int networkTTL = 2;
		boolean conduitSubs = true;
		boolean dynamicOnly = false;

		// Setup broker networks
		bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL,
				conduitSubs);

		bridgeBrokers("BrokerD", "BrokerC", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerC", "BrokerD", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerD", "BrokerB", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerB", "BrokerD", dynamicOnly, networkTTL,
				conduitSubs);

		startAllBrokers();

		// Setup destination
		Destination dest = createDestination("TEST.FOO", true);

		// Setup consumers
		MessageConsumer clientA = createConsumer("BrokerA", dest, "msgId > 0");
		MessageConsumer clientB = createConsumer("BrokerB", dest, "msgId > 0");
		MessageConsumer clientC = createConsumer("BrokerC", dest, "msgId > 0");
		MessageConsumer clientD = createConsumer("BrokerD", dest, "msgId > 0");
		// let consumers propogate around the network
		Thread.sleep(5000);
		
		clientD.setMessageListener(this);

		// Send messages
		String[] brokers = { "BrokerA", "BrokerB", "BrokerC", "BrokerD" };
		HashMap<String, Object> props = new HashMap<String, Object>();
		for (String broker : brokers) {
			props.put("sender", broker);
			for (int i = 1; i <= MESSAGE_COUNT; i++) {
				props.put("msgId", i);
				sendMessages(broker, dest, 1, props);
			}
		}

		// Get message count
		MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
		MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
		MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
		MessageIdList msgsD = getConsumerMessages("BrokerD", clientD);

		msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 4);
		msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 4);
		msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 4);
		msgsD.waitForMessagesToArrive(MESSAGE_COUNT * 4);
		
		System.out.println(msgsA.toString());

		assertEquals(MESSAGE_COUNT * 4, msgsA.getMessageCount());
		assertEquals(MESSAGE_COUNT * 4, msgsB.getMessageCount());
		assertEquals(MESSAGE_COUNT * 4, msgsC.getMessageCount());
		assertEquals(MESSAGE_COUNT * 4, msgsD.getMessageCount());
	}

	/**
	 * A simple square topology BrokerA <-> BrokerB BrokerA <-> BrokerC BrokerB
	 * <-> BrokerD BrokerD <-> BrokerC
	 * 
	 */
	public void testSquareConnectedBrokerNetwork() throws Exception {
		int networkTTL = 2;
		boolean conduitSubs = true;
		boolean dynamicOnly = false;

		// Setup broker networks
		bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL,
				conduitSubs);

		bridgeBrokers("BrokerD", "BrokerC", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerC", "BrokerD", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerD", "BrokerB", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerB", "BrokerD", dynamicOnly, networkTTL,
				conduitSubs);

		startAllBrokers();

		// Setup destination
		Destination dest = createDestination("TEST.FOO", true);

		// Setup consumers
		MessageConsumer clientA = createConsumer("BrokerA", dest);
		MessageConsumer clientB = createConsumer("BrokerB", dest);
		MessageConsumer clientC = createConsumer("BrokerC", dest);
		MessageConsumer clientD = createConsumer("BrokerD", dest);
		// let consumers propogate around the network
		Thread.sleep(5000);

		// Send messages
		sendMessages("BrokerA", dest, MESSAGE_COUNT);
		sendMessages("BrokerB", dest, MESSAGE_COUNT);
		sendMessages("BrokerC", dest, MESSAGE_COUNT);
		sendMessages("BrokerD", dest, MESSAGE_COUNT);

		// Get message count
		MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
		MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
		MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
		MessageIdList msgsD = getConsumerMessages("BrokerD", clientD);

		msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 4);
		msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 4);
		msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 4);
		msgsD.waitForMessagesToArrive(MESSAGE_COUNT * 4);

		assertEquals(MESSAGE_COUNT * 4, msgsA.getMessageCount());
		assertEquals(MESSAGE_COUNT * 4, msgsB.getMessageCount());
		assertEquals(MESSAGE_COUNT * 4, msgsC.getMessageCount());
		assertEquals(MESSAGE_COUNT * 4, msgsD.getMessageCount());
	}

	public void setUp() throws Exception {
		super.setAutoFail(true);
		super.setUp();
		String options = new String("?persistent=false&useJmx=false");
		createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
		createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
		createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
		createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD" + options));
	}

	public static Test suite() {
		return suite(FourBrokerTopicNetworkTest.class);
	}

	@Override
	public void onMessage(Message message) {
		try {
			System.err.println(message.getStringProperty("sender") + " msgID:" + message.getIntProperty("msgId")
);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
}

{code}

I don't know if there's anything wrong with this test, or if I should use different configurations
of TTL, conduit, and dynamicOnly. I tested it with the latest AMQ I could get (5.5.1).

As you can see delivered messages are more than 20, they are 25. The reason behind it can
be seen in the testSquareConnectedBrokerNetwork2 method: clientD will print all messages coming
from BrokerA twice as they are forwarded by both BrokerB and BrokerC on two different paths.
And of course this is a major problem whenever a broker network has multiple paths as message
duplication becomes so severe that it basically kills the whole thing.

Please let me know if the test is correct as I'd like to have some more insight about why
this is happening. Also my colleagues and I have some ideas about the correct way to fix it.

Regards,

g
                
> Duplicate topic messages received with network of brokers and selectors
> -----------------------------------------------------------------------
>
>                 Key: AMQ-1509
>                 URL: https://issues.apache.org/jira/browse/AMQ-1509
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Transport
>    Affects Versions: 4.1.1
>            Reporter: Howard Orner
>            Assignee: Rob Davies
>             Fix For: 5.3.0
>
>         Attachments: ActiveMQActor.java
>
>
> If you create a network of two brokers, A and B, one publisher publishing to A, and n
(where n is > 1) receivers with selectors, each receiver recieves n messages for every
1 message sent.  The key here is to have a selector.   It would appear that the conduitSubscriptions
flag does not work when using selectors.  The conduit does not properly reconcile consumers
if they have selectors.  A suggested soltuion would be that ather than process each selector
independantly, each selector should be or'ed together and if any selector results in true
then a single message should be sent to the other broker.
> In doing research, it would appear that this problem was introduced with bug fix AMQ-810.
 Another user reported it via email back to the assignee of AMQ-810 and a short dialog transpired.
 See http://www.mail-archive.com/activemq-users@geronimo.apache.org/msg05198.html.  

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message