activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bsny...@apache.org
Subject svn commit: r881174 - in /activemq/trunk: activemq-core/pom.xml activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java pom.xml
Date Tue, 17 Nov 2009 06:43:17 GMT
Author: bsnyder
Date: Tue Nov 17 06:43:17 2009
New Revision: 881174

URL: http://svn.apache.org/viewvc?rev=881174&view=rev
Log:
Changes to the test for AMQ-2324 and AMQ-2484; trying to get it to pass consistently

Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
    activemq/trunk/pom.xml

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=881174&r1=881173&r2=881174&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Tue Nov 17 06:43:17 2009
@@ -213,6 +213,11 @@
       <scope>test</scope>
     </dependency>    
     <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-jmdns_1.0</artifactId>
       <optional>true</optional>

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=881174&r1=881173&r2=881174&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
Tue Nov 17 06:43:17 2009
@@ -1,17 +1,25 @@
 package org.apache.activemq.network;
 
+import java.io.File;
+import java.io.IOException;
+
 import javax.jms.DeliveryMode;
 
 import junit.framework.Test;
 
 import org.apache.activemq.broker.StubConnection;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -29,17 +37,67 @@
         NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
         config.setBrokerName("local");
         config.setDispatchAsync(false);
-        bridge = new DemandForwardingBridge(config, createTransport(), createRemoteTransport());
+        
+        Transport localTransport = createTransport(); 
+        localTransport.setTransportListener(new TransportListener() {
+        	Command command = null;
+			public void onCommand(Object o) {
+				this.command = (Command) o;
+				LOG.info("Command from [" + command.getFrom() + "] to [" + command.getTo() + "]");
+			}
+
+			public void onException(IOException error) {
+				LOG.info("Command from [" + command.getFrom() + "] to [" + command.getTo() + "]");
+				LOG.info("Exception: " + error);
+			}
+
+			public void transportInterupted() {
+				LOG.info("Interruption on local transport");
+			}
+
+			public void transportResumed() {
+				LOG.info("Resumption on local transport");
+			}
+        });
+        
+        Transport remoteTransport = createRemoteTransport();
+        remoteTransport.setTransportListener(new TransportListener() {
+        	Command command = null;
+			public void onCommand(Object o) {
+				this.command = (Command) o;
+				LOG.info("Command from [" + command.getFrom() + "] to [" + command.getTo() + "]");
+			}
+
+			public void onException(IOException error) {
+				LOG.info("Command from [" + command.getFrom() + "] to [" + command.getTo() + "]");
+				LOG.info("Exception: " + error);
+			}
+
+			public void transportInterupted() {
+				LOG.info("Interruption on remote transport");
+			}
+
+			public void transportResumed() {
+				LOG.info("Resumption on remote transport");
+			}
+        });
+        
+        bridge = new DemandForwardingBridge(config, localTransport, remoteTransport);
         bridge.setBrokerService(broker);
         bridge.start();
         
         // Enable JMX support on the local and remote brokers 
-        broker.setUseJmx(true);
-        remoteBroker.setUseJmx(true);
+//        broker.setUseJmx(true);
+//        remoteBroker.setUseJmx(true);
         
-        // Set the names of teh local and remote brokers 
-        broker.setBrokerName("local");
-        remoteBroker.setBrokerName("remote");
+        // Make sure persistence is disabled 
+        broker.setPersistent(false);
+        broker.setPersistenceAdapter(null);
+        remoteBroker.setPersistent(false);
+        remoteBroker.setPersistenceAdapter(null);
+        
+        // Remove the activemq-data directory from the creation of the remote broker
+        FileUtils.deleteDirectory(new File("activemq-data"));
     }
 	
 	protected void tearDown() throws Exception {
@@ -66,31 +124,42 @@
         
         for (int i = 0; i < sendNumMessages; ++i) {
         	destinationInfo1 = createDestinationInfo(connection1, connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
-	        connection1.send(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT));
+//	        connection1.send(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT));
+	        connection1.request(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT));
         }
         
         // Ensure that there are 10 messages on the local broker 
-        assertTrue(countMessagesInQueue(connection1, connectionInfo1, destinationInfo1) ==
10);
+        int messageCount1 = countMessagesInQueue(connection1, connectionInfo1, destinationInfo1);
+        assertEquals(10, messageCount1);
         
         
         // Create a consumer on the remote broker 
-        StubConnection connection2 = createRemoteConnection();
+        final StubConnection connection2 = createRemoteConnection();
         ConnectionInfo connectionInfo2 = createConnectionInfo();
         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
         connection2.send(connectionInfo2);
         connection2.send(sessionInfo2);
         ActiveMQDestination destinationInfo2 = 
         	createDestinationInfo(connection2, connectionInfo2, ActiveMQDestination.QUEUE_TYPE);
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2);
+        final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2);
         connection2.send(consumerInfo2);
         
         // Consume 5 of the messages from the remote broker and ack them. 
-        // Because the prefetch size is set to 1000, this will cause the 
-        // messages on the local broker to be forwarded to the remote broker. 
+        // Because the prefetch size is set to 1000 in the createConsumerInfo() 
+        // method, this will cause the messages on the local broker to be 
+        // forwarded to the remote broker. 
         for (int i = 0; i < receiveNumMessages; ++i) {
-	        Message message1 = receiveMessage(connection2);
-	        assertNotNull(message1);
-            connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE));
+        	assertTrue("Message " + i + " was not received", Wait.waitFor(new Wait.Condition()
{
+                public boolean isSatisified() throws Exception {
+			        Message message1 = receiveMessage(connection2);
+			        assertNotNull(message1);
+		            connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE));
+                    return message1 != null;
+                }            
+            }));
+//	        Message message1 = receiveMessage(connection2);
+//	        assertNotNull(message1);
+//          connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE));
         }
         
         // Close the consumer on the remote broker 
@@ -99,10 +168,13 @@
         // Ensure that there are zero messages on the local broker. This tells 
         // us that those messages have been prefetched to the remote broker 
         // where the demand exists. 
-        assertTrue(countMessagesInQueue(connection1, connectionInfo1, destinationInfo1) ==
0);
+        int messageCount2 = countMessagesInQueue(connection1, connectionInfo1, destinationInfo1);
+// Sometimes it fails here 
+        assertEquals(0, messageCount2);
         
         // There should now be 5 messages stuck on the remote broker 
-        assertTrue(countMessagesInQueue(connection2, connectionInfo2, destinationInfo1) ==
5);
+        int messageCount3 = countMessagesInQueue(connection2, connectionInfo2, destinationInfo2);
+        assertEquals(5, messageCount3);
         
         // Create a consumer on the local broker just to confirm that it doesn't 
         // receive any messages  
@@ -113,27 +185,38 @@
 		//////////////////////////////////////////////////////
         // An assertNull() is done here because this is currently the correct 
         // behavior. This is actually the purpose of this test - to prove that 
-        // messages are stuck on the remote broker. AMQ-2324 aims to fix this 
-        // situation so that messages don't get stuck. 
+        // messages are stuck on the remote broker. AMQ-2324 and AMQ-2484 aim 
+        // to fix this situation so that messages don't get stuck. 
         assertNull(message1);
 		//////////////////////////////////////////////////////
         
-        consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2);
-        connection2.send(consumerInfo2);
+        ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo2, destinationInfo2);
+        connection2.send(consumerInfo3);
         
         // Consume the last 5 messages from the remote broker and ack them just 
         // to clean up the queue. 
+        int counter = 0;
         for (int i = 0; i < receiveNumMessages; ++i) {
 	        message1 = receiveMessage(connection2);
 	        assertNotNull(message1);
-            connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE));
+            connection2.send(createAck(consumerInfo3, message1, 1, MessageAck.STANDARD_ACK_TYPE));
+            ++counter;
         }
+        // Ensure that 5 messages were received
+        assertEquals(receiveNumMessages, counter);
         
-        // Close the consumer on the remote broker 
-        connection2.send(consumerInfo2.createRemoveCommand());
+        Thread.sleep(2000);
         
         // Ensure that the queue on the remote broker is empty 
-        assertTrue(countMessagesInQueue(connection2, connectionInfo2, destinationInfo2) ==
0);
+        int messageCount4 = countMessagesInQueue(connection2, connectionInfo2, destinationInfo1);
+// Sometimes it fails here 
+        assertEquals(0, messageCount4);
+        
+        // Close the consumer on the remote broker 
+        connection2.send(consumerInfo3.createRemoveCommand());
+        
+        connection1.stop();
+        connection2.stop();
 	}
 	
     public static Test suite() {

Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=881174&r1=881173&r2=881174&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Tue Nov 17 06:43:17 2009
@@ -50,6 +50,7 @@
     <openjpa-version>1.2.0</openjpa-version>
     <commons-dbcp-version>1.2.2</commons-dbcp-version>
     <commons-httpclient-version>3.1</commons-httpclient-version>
+    <commons-io-version>1.4</commons-io-version>
     <commons-logging-version>1.1</commons-logging-version>
     <commons-pool-version>1.4</commons-pool-version>
     <commons-primitives-version>1.0</commons-primitives-version>
@@ -815,6 +816,12 @@
       </dependency>
 
       <dependency>
+        <groupId>commons-io</groupId>
+        <artifactId>commons-io</artifactId>
+        <version>${commons-io-version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.ant</groupId>
         <artifactId>ant</artifactId>
         <version>${ant-version}</version>



Mime
View raw message