activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fran Vázquez (JIRA) <j...@apache.org>
Subject [jira] Commented: (AMQ-1573) ConduitBridge merges destinations x.a and x.>, resulting in loss of subscription
Date Wed, 18 Mar 2009 08:45:41 GMT

    [ https://issues.apache.org/activemq/browse/AMQ-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=50625#action_50625
] 

Fran Vázquez commented on AMQ-1573:
-----------------------------------

A simple test to reproduce this issue (just execute several times):


import java.net.URI;

import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;

public class ActiveMQSubscriptionTest {
	

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		try {
			URI discoveryUri = new URI("multicast://default?group=onegroup");
			URI localPort = new URI("tcp://localhost:0");

			BrokerService broker = new BrokerService();
			broker.setUseJmx(false);
			broker.setPersistent(false);

			TransportConnector connector = broker.addConnector(localPort);
			connector.setDiscoveryUri(discoveryUri);
			broker.addNetworkConnector(discoveryUri);
			broker.start();

			 
			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
					ActiveMQConnection.DEFAULT_PASSWORD, "vm://"+ broker.getBrokerName()); 

			ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
			connection.start();

			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			MessageConsumer consumer = session.createConsumer(session.createTopic("ActiveMQ.Advisory.>"));
			TestListener listener= new TestListener("ActiveMQ.Advisory.>");
			consumer.setMessageListener(listener);
			
			Destination d= new ActiveMQTopic("one.>");
			consumer = session.createConsumer(d);
			listener = new TestListener("one.>");
			consumer.setMessageListener(listener);
			
			d= new ActiveMQTopic("one.two.>");
			consumer = session.createConsumer(d);
			listener = new TestListener("one.two.>");
			consumer.setMessageListener(listener);
			
			d= new ActiveMQTopic("one.two.three");
			consumer = session.createConsumer(d);
			listener = new TestListener("one.two.three");
			consumer.setMessageListener(listener);
			
			d= new ActiveMQTopic("one.four.five");
			MessageProducer producer = session.createProducer(d);
			TextMessage msg = session.createTextMessage();
			
			int i = 0;
			String ID = connection.getConnectionInfo().getClientId();
			while(true){
				synchronized(session){
					session.wait(10000);
					msg.setText(ID +" ----------- " + i +" -----------" );
					producer.send(msg);
					i++;
				}
			}
		}catch(Exception e){
			System.out.println(e);
		}
	}
	
}
        
class TestListener implements MessageListener {

	private String topic;

	public TestListener(String topic)
	{
		this.topic = topic;
	}

	public void onMessage(javax.jms.Message message) {
		try{

			System.out.println(topic + " Message : " + message.getJMSDestination().toString());
			String content = "Not known";
			byte[] body;
			if(message instanceof BytesMessage)
			{
				body = new byte[(int)((BytesMessage)message).getBodyLength()];
				((BytesMessage)message).readBytes(body);
				content = new String(body);

			}
			else if (message instanceof TextMessage)
			{
				content = ((TextMessage)message).getText();
			}
			else if (message instanceof ActiveMQMessage){
				content = "";
			}
			System.out.println(content);
		}catch(Exception e){
			System.out.println(e);
		}
	}
}



> ConduitBridge merges destinations x.a and x.>, resulting in loss of subscription
> --------------------------------------------------------------------------------
>
>                 Key: AMQ-1573
>                 URL: https://issues.apache.org/activemq/browse/AMQ-1573
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.0.0
>            Reporter: Marcus Nilsson
>            Assignee: Rob Davies
>             Fix For: 5.3.0
>
>
> Two brokers A <-> B. 
> 1. B subscribes to x.a and x.>. 
> 2. A registers subscription x.a in ConduitBridge
> 3. A examines subscription x.> in ConduitBridge:
> * A DestinationFilter is created for x.>
> * The filter matches the old subscription for x.a
> * Consumer B is added as "interested" in x.a
> 4. Consumer on B will only receive messages for x.a, but not x.b.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message