activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kaveh Ghahremani" <ka...@yoonew.com>
Subject Durable Topic ?
Date Thu, 19 Jun 2008 17:54:43 GMT
I've created a simple setup to test persistent messages on durable
topics but I'm experiencing some erratic behavior.

Producer and consumer are copied straight from the examples and pasted
below.
Two standalone 5.1.0 brokers on two hosts with default config, using
dynamic discovery for producer and consumer.

The publisher sends 100 messages, after 10 messages I CTRL+C'ed the
consumer. I then restarted the consumer (10 seconds later) at message 20
but never received messages 11-20.

This is only the case when you first start the broker. When I leave the
brokers running and repeat the test, everything works fine and I get
missed messages.

I also noticed that the consumer connected to the second broker when I
started it the second time.


####################################################
# Consumer Output
####################################################

C:\eclipse\workspace\jms-clients\bin>java -cp
c:\dev\apache-activemq-5.1.0\activemq-all-5.1.0.jar;. AMQTopicSubscriber
Jun 19, 2008 12:47:48 PM
org.apache.activemq.transport.discovery.DiscoveryTransport onServiceAdd
INFO: Adding new broker connection URL: tcp://mercury.xyzcorp.com:61616
Jun 19, 2008 12:47:48 PM
org.apache.activemq.transport.discovery.DiscoveryTransport onServiceAdd
INFO: Adding new broker connection URL: tcp://saturn.xyzcorp.com:61616
Jun 19, 2008 12:47:48 PM
org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://saturn.xyzcorp.com:61616
Connected to topic.
Received: Message: 0 sent at: Thu Jun 19 12:47:53 EDT 2008
Received: Message: 1 sent at: Thu Jun 19 12:47:54 EDT 2008
Received: Message: 2 sent at: Thu Jun 19 12:47:55 EDT 2008
Received: Message: 3 sent at: Thu Jun 19 12:47:56 EDT 2008
Received: Message: 4 sent at: Thu Jun 19 12:47:57 EDT 2008
Received: Message: 5 sent at: Thu Jun 19 12:47:58 EDT 2008
Received: Message: 6 sent at: Thu Jun 19 12:47:59 EDT 2008
Received: Message: 7 sent at: Thu Jun 19 12:48:00 EDT 2008
Received: Message: 8 sent at: Thu Jun 19 12:48:01 EDT 2008
Received: Message: 9 sent at: Thu Jun 19 12:48:02 EDT 2008
Received: Message: 10 sent at: Thu Jun 19 12:48:03 EDT 2008

C:\eclipse\workspace\jms-clients\bin>java -cp
c:\dev\apache-activemq-5.1.0\activemq-all-5.1.0.jar;. AMQTopicSubscriber
Jun 19, 2008 12:48:14 PM
org.apache.activemq.transport.discovery.DiscoveryTransport onServiceAdd
INFO: Adding new broker connection URL: tcp://saturn.xyzcorp.com:61616
Jun 19, 2008 12:48:14 PM
org.apache.activemq.transport.discovery.DiscoveryTransport onServiceAdd
INFO: Adding new broker connection URL: tcp://mercury.xyzcorp.com:61616
Jun 19, 2008 12:48:14 PM
org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://mercury.xyzcorp.com:61616
Connected to topic.
Received: Message: 21 sent at: Thu Jun 19 12:48:14 EDT 2008
Received: Message: 22 sent at: Thu Jun 19 12:48:15 EDT 2008
Received: Message: 23 sent at: Thu Jun 19 12:48:16 EDT 2008
Received: Message: 24 sent at: Thu Jun 19 12:48:17 EDT 2008
Received: Message: 25 sent at: Thu Jun 19 12:48:18 EDT 2008
Received: Message: 26 sent at: Thu Jun 19 12:48:19 EDT 2008
Received: Message: 27 sent at: Thu Jun 19 12:48:20 EDT 2008
Received: Message: 28 sent at: Thu Jun 19 12:48:21 EDT 2008
Received: Message: 29 sent at: Thu Jun 19 12:48:22 EDT 2008
Received: Message: 30 sent at: Thu Jun 19 12:48:23 EDT 2008
Received: Message: 31 sent at: Thu Jun 19 12:48:24 EDT 2008
(all the way to Message 99)

####################################################
# Broker 1 Startup
####################################################

[activemq@saturn bin]$ ./activemq
ACTIVEMQ_HOME: /usr/local/apache-activemq-5.1.0
ACTIVEMQ_BASE: /usr/local/apache-activemq-5.1.0
Loading message broker from: xbean:activemq.xml
INFO  BrokerService                  - Using Persistence Adapter:
AMQPersistenceAdapter(/usr/local/apache-activemq-5.1.0/data)
INFO  BrokerService                  - ActiveMQ 5.1.0 JMS Message Broker
(localhost) is starting
INFO  BrokerService                  - For help or more information
please see: http://activemq.apache.org/
INFO  AMQPersistenceAdapter          - AMQStore starting using
directory: /usr/local/apache-activemq-5.1.0/data
INFO  KahaStore                      - Kaha Store using data directory
/usr/local/apache-activemq-5.1.0/data/kr-store/state
INFO  AMQPersistenceAdapter          - Active data files: []
INFO  KahaStore                      - Kaha Store using data directory
/usr/local/apache-activemq-5.1.0/data/kr-store/data
INFO  TransportServerThreadSupport   - Listening for connections at:
tcp://saturn.xyzcorp.com:61616
INFO  TransportConnector             - Connector openwire Started
INFO  TransportServerThreadSupport   - Listening for connections at:
ssl://saturn.xyzcorp.com:61617
INFO  TransportConnector             - Connector ssl Started
INFO  TransportServerThreadSupport   - Listening for connections at:
stomp://saturn.xyzcorp.com:61613
INFO  TransportConnector             - Connector stomp Started
INFO  TransportServerThreadSupport   - Listening for connections at:
xmpp://saturn.xyzcorp.com:61222
INFO  TransportConnector             - Connector xmpp Started
INFO  NetworkConnector               - Network Connector default-nc
Started
INFO  BrokerService                  - ActiveMQ JMS Message Broker
(localhost, ID:saturn.xyzcorp.com-37887-1213893891471-0:0) started
INFO  log                            - Logging to
org.slf4j.impl.JCLLoggerAdapter(org.mortbay.log) via
org.mortbay.log.Slf4jLog
INFO  log                            - jetty-6.1.9
INFO  DiscoveryNetworkConnector      - Establishing network connection
between from vm://localhost to tcp://mercury.xyzcorp.com:61616
INFO  TransportConnector             - Connector vm://localhost Started
INFO  DemandForwardingBridge         - Network connection between
vm://localhost#0 and
tcp://mercury.xyzcorp.com/10.10.1.53:61616(localhost) has been
established.
INFO  WebConsoleStarter              - ActiveMQ WebConsole initialized.
INFO  /admin                         - Initializing Spring
FrameworkServlet 'dispatcher'
INFO  log                            - ActiveMQ Console at
http://0.0.0.0:8161/admin
INFO  log                            - ActiveMQ Web Demos at
http://0.0.0.0:8161/demo
INFO  log                            - RESTful file access application
at http://0.0.0.0:8161/fileserver
INFO  log                            - Started
SelectChannelConnector@0.0.0.0:8161
INFO  FailoverTransport              - Successfully connected to
tcp://localhost:61616


####################################################
# Broker 2 Startup
####################################################

[activemq@mercury bin]$ ./activemq
ACTIVEMQ_HOME: /usr/local/apache-activemq-5.1.0
ACTIVEMQ_BASE: /usr/local/apache-activemq-5.1.0
Loading message broker from: xbean:activemq.xml
INFO  BrokerService                  - Using Persistence Adapter:
AMQPersistenceAdapter(/usr/local/apache-activemq-5.1.0/data)
INFO  BrokerService                  - ActiveMQ 5.1.0 JMS Message Broker
(localhost) is starting
INFO  BrokerService                  - For help or more information
please see: http://activemq.apache.org/
INFO  AMQPersistenceAdapter          - AMQStore starting using
directory: /usr/local/apache-activemq-5.1.0/data
INFO  KahaStore                      - Kaha Store using data directory
/usr/local/apache-activemq-5.1.0/data/kr-store/state
INFO  AMQPersistenceAdapter          - Active data files: []
INFO  KahaStore                      - Kaha Store using data directory
/usr/local/apache-activemq-5.1.0/data/kr-store/data
INFO  TransportServerThreadSupport   - Listening for connections at:
tcp://mercury.xyzcorp.com:61616
INFO  TransportConnector             - Connector openwire Started
INFO  TransportServerThreadSupport   - Listening for connections at:
ssl://mercury.xyzcorp.com:61617
INFO  TransportConnector             - Connector ssl Started
INFO  TransportServerThreadSupport   - Listening for connections at:
stomp://mercury.xyzcorp.com:61613
INFO  TransportConnector             - Connector stomp Started
INFO  TransportServerThreadSupport   - Listening for connections at:
xmpp://mercury.xyzcorp.com:61222
INFO  TransportConnector             - Connector xmpp Started
INFO  NetworkConnector               - Network Connector default-nc
Started
INFO  BrokerService                  - ActiveMQ JMS Message Broker
(localhost, ID:mercury.xyzcorp.com-47412-1213893913409-0:0) started
INFO  log                            - Logging to
org.slf4j.impl.JCLLoggerAdapter(org.mortbay.log) via
org.mortbay.log.Slf4jLog
INFO  log                            - jetty-6.1.9
INFO  DiscoveryNetworkConnector      - Establishing network connection
between from vm://localhost to tcp://saturn.xyzcorp.com:61616
INFO  TransportConnector             - Connector vm://localhost Started
INFO  DemandForwardingBridge         - Network connection between
vm://localhost#0 and
tcp://saturn.xyzcorp.com/10.10.1.61:61616(localhost) has been
established.
INFO  WebConsoleStarter              - ActiveMQ WebConsole initialized.
INFO  /admin                         - Initializing Spring
FrameworkServlet 'dispatcher'
INFO  log                            - ActiveMQ Console at
http://0.0.0.0:8161/admin
INFO  log                            - ActiveMQ Web Demos at
http://0.0.0.0:8161/demo
INFO  log                            - RESTful file access application
at http://0.0.0.0:8161/fileserver
INFO  log                            - Started
SelectChannelConnector@0.0.0.0:8161
INFO  FailoverTransport              - Successfully connected to
tcp://localhost:61616

####################################################
# Publisher
####################################################

import java.util.Date;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


/**
 * @author kaveh
 * 
 * Creates a publisher that sends persistent messages to a topic
 *
 */
public class AMQTopicPublisher {
	
    private String user 		=
ActiveMQConnection.DEFAULT_USER;
    private String password 	= ActiveMQConnection.DEFAULT_PASSWORD;
    private String url 		= "discovery://(multicast://default)";
    private String topic 		= "TEST.FOO";
    
    private boolean transacted	= false;
    
    private long sleepTime = 1000;
    
    private Connection connection;
    private Destination destination;
    

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		AMQTopicPublisher topicPublisher = new
AMQTopicPublisher();
		topicPublisher.run();

	}
	
	public void run() {
		try {
	        // Create the connection.
	        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(user, password, url);
	        connection = connectionFactory.createConnection();
	        connection.start();
	
	        // Create the session
	        Session session = connection.createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
	        destination = session.createTopic(topic);
	
	        // Create the persistent producer
	        MessageProducer producer =
session.createProducer(destination);
	        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
	        
	        // Send messages
	        for (int i = 0; i < 100; i++) {
	            TextMessage message =
session.createTextMessage("Message: " + i + " sent at: " + new Date());
	            producer.send(message);
	            System.out.println("Message " + i + " sent.");
	            Thread.sleep(sleepTime);
	        }
	        
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			System.out.println("Finished.");
			System.exit(1);
		}

		
	}

}


####################################################
# Subscriber
####################################################

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


/**
 * @author kaveh
 * 
 * Creates a durable topic subscriber
 *
 */
public class AMQTopicSubscriber implements MessageListener,
ExceptionListener {
	
	private Session 	 	 session;
	private Destination 	 destination;
	private MessageConsumer consumer;
	
    private String 	user 			=
ActiveMQConnection.DEFAULT_USER;
    private String 	password 		=
ActiveMQConnection.DEFAULT_PASSWORD;
    private String 	url 			=
"discovery://(multicast://default)";
    private String		topic			= "TEST.FOO";
    private boolean 	transacted		= false;
    private String 	clientId		= "Client1";
    private int 		ackMode 		=
Session.AUTO_ACKNOWLEDGE;
    private String 	consumerName 	= "Consumer1";
    private boolean	verbose			= true;
    
    /**
	 * @param args
	 */
	public static void main(String[] args) {
		AMQTopicSubscriber topicSubscriber = new
AMQTopicSubscriber();
		topicSubscriber.run();
	}
	
	public void run() {
		ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(user, password, url);
        Connection connection;
		try {
			// create and start connection
			connection =
connectionFactory.createConnection();
			connection.setClientID(clientId);
			connection.setExceptionListener(this);
            connection.start();
            
            // create session and consumer
            session = connection.createSession(transacted, ackMode);
            destination = session.createTopic(topic);
            consumer =
session.createDurableSubscriber((Topic)destination, consumerName);
            consumer.setMessageListener(this);
            System.out.println("Connected to topic.");
            
            while (true) {
                // Wait for messages
            }
            
		} catch (JMSException e) {
			e.printStackTrace();
		}
        
	}

	@Override
	public void onMessage(Message message) {
        try {

            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage)message;
                if (verbose) {

                    String msg = txtMsg.getText();
                    if (msg.length() > 50) {
                        msg = msg.substring(0, 50) + "...";
                    }

                    System.out.println("Received: " + msg);
                }
            } else {
                if (verbose) {
                    System.out.println("Received: " + message);
                }
            }

            if (transacted) {
                session.commit();
            } else if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
                message.acknowledge();
            }

        } catch (JMSException e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        } 		
		
	}

	@Override
	public void onException(JMSException exception) {
		System.out.println("JMS Exception occured.");
		exception.printStackTrace();		
	}

}


Mime
View raw message