activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bsny...@apache.org
Subject svn commit: r883411 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
Date Mon, 23 Nov 2009 17:02:40 GMT
Author: bsnyder
Date: Mon Nov 23 17:02:39 2009
New Revision: 883411

URL: http://svn.apache.org/viewvc?rev=883411&view=rev
Log:
Updated test for AMQ-2324 and AMQ-2484

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=883411&r1=883410&r2=883411&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
Mon Nov 23 17:02:39 2009
@@ -2,35 +2,102 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
+import javax.jms.Connection;
 import javax.jms.DeliveryMode;
-
-import junit.framework.Test;
-
+import javax.jms.MessageNotWriteableException;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerTestSupport;
 import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.ManagementContext;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.Wait;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class BrokerNetworkWithStuckMessagesTest extends NetworkTestSupport {
-	
+/**
+ * This class duplicates most of the functionality in {@link NetworkTestSupport} 
+ * and {@link BrokerTestSupport} because more control was needed over how brokers 
+ * and connectors are created. Also, this test asserts message counts via JMX on 
+ * each broker. 
+ * 
+ * @author bsnyder
+ *
+ */
+public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSupport*/ {
+    
     private static final Log LOG = LogFactory.getLog(BrokerNetworkWithStuckMessagesTest.class);
-	
-	private DemandForwardingBridge bridge;
-
-	protected void setUp() throws Exception {
-        super.setUp();
+    
+    private BrokerService localBroker; 
+    private BrokerService remoteBroker; 
+    private DemandForwardingBridge bridge;
+    
+    protected Map<String, BrokerService> brokers = new HashMap<String, BrokerService>();
+    protected ArrayList connections = new ArrayList();
+    
+    protected TransportConnector connector;
+    protected TransportConnector remoteConnector;
+    
+    protected long idGenerator;
+    protected int msgIdGenerator;
+    protected int tempDestGenerator;
+    protected int maxWait = 4000;
+    protected String queueName = "TEST";
+    
+    protected String amqDomain = "org.apache.activemq";
+        
+    protected void setUp() throws Exception {
+        
+        // For those who want visual confirmation: 
+        //   Uncomment the following to enable JMX support on a port number to use 
+        //   Jconsole to view each broker. You will need to add some calls to 
+        //   Thread.sleep() to be able to actually slow things down so that you 
+        //   can manually see JMX attrs. 
+//        System.setProperty("com.sun.management.jmxremote", "");
+//        System.setProperty("com.sun.management.jmxremote.port", "1099");
+//        System.setProperty("com.sun.management.jmxremote.authenticate", "false");
+//        System.setProperty("com.sun.management.jmxremote.ssl", "false");
+        
+        // Create the local broker 
+        createBroker();
+        // Create the remote broker 
+        createRemoteBroker();
+        
+        // Remove the activemq-data directory from the creation of the remote broker
+        FileUtils.deleteDirectory(new File("activemq-data"));
         
         // Create a network bridge between the local and remote brokers so that 
         // demand-based forwarding can take place
@@ -39,79 +106,42 @@
         config.setDispatchAsync(false);
         
         Transport localTransport = createTransport(); 
-        localTransport.setTransportListener(new TransportListener() {
-        	Command command = null;
-			public void onCommand(Object o) {
-				this.command = (Command) o;
-				LOG.info("Command from [" + command.getFrom() + "] to [" + command.getTo() + "]");
-			}
-
-			public void onException(IOException error) {
-				LOG.info("Command from [" + command.getFrom() + "] to [" + command.getTo() + "]");
-				LOG.info("Exception: " + error);
-			}
-
-			public void transportInterupted() {
-				LOG.info("Interruption on local transport");
-			}
-
-			public void transportResumed() {
-				LOG.info("Resumption on local transport");
-			}
-        });
-        
         Transport remoteTransport = createRemoteTransport();
-        remoteTransport.setTransportListener(new TransportListener() {
-        	Command command = null;
-			public void onCommand(Object o) {
-				this.command = (Command) o;
-				LOG.info("Command from [" + command.getFrom() + "] to [" + command.getTo() + "]");
-			}
-
-			public void onException(IOException error) {
-				LOG.info("Command from [" + command.getFrom() + "] to [" + command.getTo() + "]");
-				LOG.info("Exception: " + error);
-			}
-
-			public void transportInterupted() {
-				LOG.info("Interruption on remote transport");
-			}
-
-			public void transportResumed() {
-				LOG.info("Resumption on remote transport");
-			}
-        });
         
+        // Create a network bridge between the two brokers 
         bridge = new DemandForwardingBridge(config, localTransport, remoteTransport);
-        bridge.setBrokerService(broker);
+        bridge.setBrokerService(localBroker);
         bridge.start();
         
-        // Enable JMX support on the local and remote brokers 
-//        broker.setUseJmx(true);
-//        remoteBroker.setUseJmx(true);
-        
-        // Make sure persistence is disabled 
-        broker.setPersistent(false);
-        broker.setPersistenceAdapter(null);
-        remoteBroker.setPersistent(false);
-        remoteBroker.setPersistenceAdapter(null);
+        waitForBridgeFormation();
         
-        // Remove the activemq-data directory from the creation of the remote broker
-        FileUtils.deleteDirectory(new File("activemq-data"));
     }
-	
-	protected void tearDown() throws Exception {
+    
+    protected void waitForBridgeFormation() throws Exception {
+        for (final BrokerService broker : brokers.values()) {
+            if (!broker.getNetworkConnectors().isEmpty()) {
+            	// Max wait here is 30 secs
+                Wait.waitFor(new Wait.Condition() {
+                    public boolean isSatisified() throws Exception {
+                        return !broker.getNetworkConnectors().get(0).activeBridges().isEmpty();
+                    }});
+            }
+        }
+    }
+    
+    protected void tearDown() throws Exception {
         bridge.stop();
-        super.tearDown();
+        localBroker.stop();
+        remoteBroker.stop();
     }
 
-	public void testBrokerNetworkWithStuckMessages() throws Exception {
-		
-		int sendNumMessages = 10;
-		int receiveNumMessages = 5;
-		
-		// Create a producer and send a batch of 10 messages to the local broker
-		StubConnection connection1 = createConnection();
+    public void testBrokerNetworkWithStuckMessages() throws Exception {
+        
+        int sendNumMessages = 10;
+        int receiveNumMessages = 5;
+        
+        // Create a producer 
+        StubConnection connection1 = createConnection();
         ConnectionInfo connectionInfo1 = createConnectionInfo();
         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
         ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
@@ -122,25 +152,25 @@
         // Create a destination on the local broker 
         ActiveMQDestination destinationInfo1 = null;
         
+        // Send a 10 messages to the local broker 
         for (int i = 0; i < sendNumMessages; ++i) {
-        	destinationInfo1 = createDestinationInfo(connection1, connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
-//	        connection1.send(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT));
-	        connection1.request(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT));
+            destinationInfo1 = createDestinationInfo(connection1, connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
+            connection1.request(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT));
         }
         
         // Ensure that there are 10 messages on the local broker 
-        int messageCount1 = countMessagesInQueue(connection1, connectionInfo1, destinationInfo1);
-        assertEquals(10, messageCount1);
+        Object[] messages = browseQueueWithJmx(localBroker);
+        assertEquals(sendNumMessages, messages.length);
         
         
-        // Create a consumer on the remote broker 
+        // Create a synchronous consumer on the remote broker 
         final StubConnection connection2 = createRemoteConnection();
         ConnectionInfo connectionInfo2 = createConnectionInfo();
         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
         connection2.send(connectionInfo2);
         connection2.send(sessionInfo2);
         ActiveMQDestination destinationInfo2 = 
-        	createDestinationInfo(connection2, connectionInfo2, ActiveMQDestination.QUEUE_TYPE);
+            createDestinationInfo(connection2, connectionInfo2, ActiveMQDestination.QUEUE_TYPE);
         final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2);
         connection2.send(consumerInfo2);
         
@@ -149,32 +179,27 @@
         // method, this will cause the messages on the local broker to be 
         // forwarded to the remote broker. 
         for (int i = 0; i < receiveNumMessages; ++i) {
-        	assertTrue("Message " + i + " was not received", Wait.waitFor(new Wait.Condition()
{
-                public boolean isSatisified() throws Exception {
-			        Message message1 = receiveMessage(connection2);
-			        assertNotNull(message1);
-		            connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE));
-                    return message1 != null;
-                }            
-            }));
-//	        Message message1 = receiveMessage(connection2);
-//	        assertNotNull(message1);
-//          connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE));
+            Message message1 = receiveMessage(connection2);
+            assertNotNull(message1);
+            connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE));
+            
+            Object[] msgs1 = browseQueueWithJmx(remoteBroker);
+            LOG.info("Found [" + msgs1.length + "] messages with JMX");
+//            assertEquals((sendNumMessages-i), msgs.length);
         }
         
-        // Close the consumer on the remote broker 
-        connection2.send(consumerInfo2.createRemoveCommand());
-        
         // Ensure that there are zero messages on the local broker. This tells 
         // us that those messages have been prefetched to the remote broker 
         // where the demand exists. 
-        int messageCount2 = countMessagesInQueue(connection1, connectionInfo1, destinationInfo1);
-// Sometimes it fails here 
-        assertEquals(0, messageCount2);
+        messages = browseQueueWithJmx(localBroker);
+        assertEquals(0, messages.length);
+        
+        // Close the consumer on the remote broker 
+        connection2.send(consumerInfo2.createRemoveCommand());
         
         // There should now be 5 messages stuck on the remote broker 
-        int messageCount3 = countMessagesInQueue(connection2, connectionInfo2, destinationInfo2);
-        assertEquals(5, messageCount3);
+        messages = browseQueueWithJmx(remoteBroker);
+        assertEquals(5, messages.length);
         
         // Create a consumer on the local broker just to confirm that it doesn't 
         // receive any messages  
@@ -182,13 +207,13 @@
         connection1.send(consumerInfo1);
         Message message1 = receiveMessage(connection1);
         
-		//////////////////////////////////////////////////////
+        //////////////////////////////////////////////////////
         // An assertNull() is done here because this is currently the correct 
         // behavior. This is actually the purpose of this test - to prove that 
         // messages are stuck on the remote broker. AMQ-2324 and AMQ-2484 aim 
         // to fix this situation so that messages don't get stuck. 
         assertNull(message1);
-		//////////////////////////////////////////////////////
+        //////////////////////////////////////////////////////
         
         ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo2, destinationInfo2);
         connection2.send(consumerInfo3);
@@ -197,30 +222,247 @@
         // to clean up the queue. 
         int counter = 0;
         for (int i = 0; i < receiveNumMessages; ++i) {
-	        message1 = receiveMessage(connection2);
-	        assertNotNull(message1);
+            message1 = receiveMessage(connection2);
+            assertNotNull(message1);
             connection2.send(createAck(consumerInfo3, message1, 1, MessageAck.STANDARD_ACK_TYPE));
             ++counter;
         }
         // Ensure that 5 messages were received
         assertEquals(receiveNumMessages, counter);
         
-        Thread.sleep(2000);
+        // Let those acks percolate... This stinks but it's the only way currently
+        // because these types of internal broker actions are non-deterministic. 
+        Thread.sleep(4000);
         
         // Ensure that the queue on the remote broker is empty 
-        int messageCount4 = countMessagesInQueue(connection2, connectionInfo2, destinationInfo1);
-// Sometimes it fails here 
-        assertEquals(0, messageCount4);
+        messages = browseQueueWithJmx(remoteBroker);
+        assertEquals(0, messages.length);
         
         // Close the consumer on the remote broker 
         connection2.send(consumerInfo3.createRemoveCommand());
         
         connection1.stop();
         connection2.stop();
+    }
+    
+    protected BrokerService createBroker() throws Exception {
+        localBroker = new BrokerService(); 
+        localBroker.setBrokerName("localhost");
+        localBroker.setUseJmx(true);
+        localBroker.setPersistenceAdapter(null);
+        localBroker.setPersistent(false);
+        connector = createConnector();
+        localBroker.addConnector(connector);
+        localBroker.start();
+        localBroker.waitUntilStarted();
+        
+        localBroker.getManagementContext().setConnectorPort(2221);
+        
+        brokers.put(localBroker.getBrokerName(), localBroker);
+        
+        return localBroker;
+    }
+    
+    protected BrokerService createRemoteBroker() throws Exception {
+        remoteBroker = new BrokerService();
+        remoteBroker.setBrokerName("remotehost");
+        remoteBroker.setUseJmx(true);
+        remoteBroker.setPersistenceAdapter(null);
+        remoteBroker.setPersistent(false);
+        remoteConnector = createRemoteConnector();
+        remoteBroker.addConnector(remoteConnector);
+        remoteBroker.waitUntilStarted();
+        
+        remoteBroker.getManagementContext().setConnectorPort(2222);
+        
+        brokers.put(remoteBroker.getBrokerName(), remoteBroker);
+        
+        return remoteBroker;
+    }
+    
+    protected Transport createTransport() throws Exception {
+        Transport transport = TransportFactory.connect(connector.getServer().getConnectURI());
+        return transport;
+    }
+    
+    protected Transport createRemoteTransport() throws Exception {
+        Transport transport = TransportFactory.connect(remoteConnector.getServer().getConnectURI());
+        return transport;
+    }
+    
+    protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException
{
+        return new TransportConnector(TransportFactory.bind(new URI(getLocalURI())));
+    }
+    
+    protected TransportConnector createRemoteConnector() throws Exception, IOException, URISyntaxException
{
+        return new TransportConnector(TransportFactory.bind(new URI(getRemoteURI())));
+    }
+    
+    protected String getRemoteURI() {
+        return "vm://remotehost";
+    }
+
+    protected String getLocalURI() {
+        return "vm://localhost";
+    }
+    
+    protected StubConnection createConnection() throws Exception {
+        Transport transport = TransportFactory.connect(connector.getServer().getConnectURI());
+        StubConnection connection = new StubConnection(transport);
+        connections.add(connection);
+        return connection;
+    }
+
+    protected StubConnection createRemoteConnection() throws Exception {
+        Transport transport = TransportFactory.connect(remoteConnector.getServer().getConnectURI());
+        StubConnection connection = new StubConnection(transport);
+        connections.add(connection);
+        return connection;
+    }
+    
+    @SuppressWarnings("unchecked")
+    private Object[] browseQueueWithJms(BrokerService broker) throws Exception {
+		Object[] messages = null;
+		Connection connection = null;
+		Session session = null;
+
+		try {
+			URI brokerUri = connector.getUri();
+			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri.toString());
+			connection = connectionFactory.createConnection();
+			connection.start();
+			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			Queue destination = session.createQueue(queueName);
+			QueueBrowser browser = session.createBrowser(destination);
+			List<Message> list = new ArrayList<Message>();
+			for (Enumeration<Message> enumn = browser.getEnumeration(); enumn.hasMoreElements();)
{
+				list.add(enumn.nextElement());
+			}
+			messages = list.toArray();
+		}
+		finally {
+			if (session != null) {
+				session.close();
+			}
+			if (connection != null) {
+				connection.close();
+			}
+		}
+		LOG.info("+Browsed with JMS: " + messages.length);
+		
+		return messages;
 	}
-	
-    public static Test suite() {
-        return suite(BrokerNetworkWithStuckMessagesTest.class);
+    
+    private Object[] browseQueueWithJmx(BrokerService broker) throws Exception {
+        Hashtable<String, String> params = new Hashtable<String, String>();
+        params.put("BrokerName", broker.getBrokerName());
+        params.put("Type", "Queue");
+        params.put("Destination", queueName);
+        ObjectName queueObjectName = ObjectName.getInstance(amqDomain, params);
+        
+        ManagementContext mgmtCtx = broker.getManagementContext(); 
+        MBeanServer mbs = mgmtCtx.getMBeanServer(); 
+        Object[] messages = (Object[]) mbs.invoke(queueObjectName, "browse", new Object[0],
new String[0]);
+        
+		LOG.info("+Browsed with JMX: " + messages.length);
+        
+        return messages;
+    }
+    
+    protected ConnectionInfo createConnectionInfo() throws Exception {
+        ConnectionInfo info = new ConnectionInfo();
+        info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+        info.setClientId(info.getConnectionId().getValue());
+        return info;
+    }
+    
+    protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception
{
+        SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+        return info;
+    }
+    
+    protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
+        ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+        return info;
+    }
+    
+    protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination
destination) throws Exception {
+        ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+        info.setBrowser(false);
+        info.setDestination(destination);
+        info.setPrefetchSize(1000);
+        info.setDispatchAsync(false);
+        return info;
+    }
+    
+    protected DestinationInfo createTempDestinationInfo(ConnectionInfo connectionInfo, byte
destinationType) {
+        DestinationInfo info = new DestinationInfo();
+        info.setConnectionId(connectionInfo.getConnectionId());
+        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
+        info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId()
+ ":" + (++tempDestGenerator), destinationType));
+        return info;
+    }
+    
+    protected ActiveMQDestination createDestinationInfo(StubConnection connection, ConnectionInfo
connectionInfo1, byte destinationType) throws Exception {
+        if ((destinationType & ActiveMQDestination.TEMP_MASK) != 0) {
+            DestinationInfo info = createTempDestinationInfo(connectionInfo1, destinationType);
+            connection.send(info);
+            return info.getDestination();
+        } else {
+            return ActiveMQDestination.createDestination(queueName, destinationType);
+        }
+    }
+    
+    protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination,
int deliveryMode) {
+        Message message = createMessage(producerInfo, destination);
+        message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
+        return message;
+    }
+    
+    protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination)
{
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+        message.setDestination(destination);
+        message.setPersistent(false);
+        try {
+            message.setText("Test Message Payload.");
+        } catch (MessageNotWriteableException e) {
+        }
+        return message;
+    }
+
+    protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte
ackType) {
+        MessageAck ack = new MessageAck();
+        ack.setAckType(ackType);
+        ack.setConsumerId(consumerInfo.getConsumerId());
+        ack.setDestination(msg.getDestination());
+        ack.setLastMessageId(msg.getMessageId());
+        ack.setMessageCount(count);
+        return ack;
+    }
+    
+    public Message receiveMessage(StubConnection connection) throws InterruptedException
{
+        return receiveMessage(connection, maxWait);
+    }
+
+    public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException
{
+        while (true) {
+            Object o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS);
+
+            if (o == null) {
+                return null;
+            }
+            if (o instanceof MessageDispatch) {
+
+                MessageDispatch dispatch = (MessageDispatch)o;
+                if (dispatch.getMessage() == null) {
+                    return null;
+                }
+                dispatch.setMessage(dispatch.getMessage().copy());
+                dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
+                return dispatch.getMessage();
+            }
+        }
     }
     
 }



Mime
View raw message