activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xita-de <phamquan...@gmail.com>
Subject How to close consumer from broker service embedded
Date Sun, 11 May 2014 07:34:27 GMT
Dear all, 

I used to BrokerService embedded in my java application and Advisory Message
to listener consumer join, consumer left(know everything about
consumer:consumer-id, client-id, ...). But I don't know how to
close/disable/stop consumer from broker service embedded or from Advisory
Message. Please guide me the way if you know. Thank you very much :)

                BrokerService brokerService = new BrokerService();
		brokerService.addConnector("tcp://localhost:61616");
		brokerService.setUseJmx(true);
		brokerService.start();


		System.out.println("Broker started.......");
		System.out.println("tcp://localhost:61616");
===
package com.fis.activemq.pubsub;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveInfo;

public class AdvisoryTest {
	static MessageListener listener = new MessageListener() {
		@Override
		public void onMessage(Message message) {
			System.out.println("message: " + message);
			if (message instanceof ActiveMQMessage) {
				ActiveMQMessage activeMessage = (ActiveMQMessage) message;
				Object command = activeMessage.getDataStructure();
				if (command instanceof ConsumerInfo) {
					ConsumerInfo consumerInfo = (ConsumerInfo)command;
				
System.out.println("consumerid="+((ConsumerInfo)command).getConsumerId());
					System.out.println("A consumer subscribed to a topic or queue: " +
command);
				} else if (command instanceof RemoveInfo) {
					RemoveInfo removeInfo = (RemoveInfo) command;
					if (removeInfo.isConsumerRemove()) {
						System.out.println("ObjectId="+removeInfo.getObjectId());
						System.out.println("A consumer unsubscribed from a topic or
queue"+command);
					} else {
						System.out.println("RemoveInfo, a connection was closed: " + command);
					}
				} else if (command instanceof ConnectionInfo) {
					System.out.println("ConnectionInfo, a new connection was made: " +
command);
				} else {
					System.out.println("Unknown command: " + command);
				}
			}
		}
	};

	public static void main(String[] args) {
		try {
			// ActiveMQConnectionFactory connectionFactory = new
			// ActiveMQConnectionFactory(
			// "user", "password", ActiveMQConnection.DEFAULT_BROKER_URL);
			ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
			Connection connection = connectionFactory.createConnection();
			Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
			connection.start();
			//Destination destinationAdvisory =
session.createTopic("ActiveMQ.Advisory..>");
			//Destination consumerTopicAdvisoryDest =
session.createTopic("ActiveMQ.Advisory.Consumer.Topic.>");
			//Destination advisoryAll =
session.createTopic("ActiveMQ.Advisory.Topic.*");
			Topic advisoryAll = session.createTopic("ActiveMQ.Advisory.Consumer.>");
			//Topic advisoryAll = session.createTopic("ActiveMQ.Advisory.>");
			
//			MessageConsumer consumerAdvisory =
session.createConsumer(consumerTopicAdvisoryDest);
//			consumerAdvisory.setMessageListener(listener);
			//
			MessageConsumer consumerAdvisoryAll =
session.createConsumer(advisoryAll);
			consumerAdvisoryAll.setMessageListener(listener);
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
}




--
View this message in context: http://activemq.2283324.n4.nabble.com/How-to-close-consumer-from-broker-service-embedded-tp4681015.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message