activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1442613 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-broker/src/main/java/org/apache/activemq/network/ activemq-unit-tests/src/test/java/org/apache/activemq/network/
Date Tue, 05 Feb 2013 14:48:32 GMT
Author: gtully
Date: Tue Feb  5 14:48:31 2013
New Revision: 1442613

URL: http://svn.apache.org/viewvc?rev=1442613&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3331 - fix regression in BrokerNetworkWithStuckMessagesTest
- vm connector exposed some turnips in there w.r.t the response correlator

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1442613&r1=1442612&r2=1442613&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
Tue Feb  5 14:48:31 2013
@@ -1327,7 +1327,11 @@ public class TransportConnection impleme
                     setDuplexNetworkConnectorId(duplexNetworkConnectorId);
                 }
                 Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker);
-                Transport remoteBridgeTransport = new ResponseCorrelator(transport);
+                Transport remoteBridgeTransport = transport;
+                if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
+                    // the vm transport case is already wrapped
+                    remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport);
+                }
                 String duplexName = localTransport.toString();
                 if (duplexName.contains("#")) {
                     duplexName = duplexName.substring(duplexName.lastIndexOf("#"));

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1442613&r1=1442612&r2=1442613&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Tue Feb  5 14:48:31 2013
@@ -603,7 +603,7 @@ public abstract class DemandForwardingBr
                 } else {
                     if (isDuplex()) {
                         if (LOG.isTraceEnabled()) {
-                            LOG.trace(configuration.getBrokerName() + " duplex command type:
" + command.getCommandId());
+                            LOG.trace(configuration.getBrokerName() + " duplex command type:
" + command.getDataStructureType());
                         }
                         if (command.isMessage()) {
                             final ActiveMQMessage message = (ActiveMQMessage) command;
@@ -976,6 +976,16 @@ public abstract class DemandForwardingBr
                                 + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath())
+ ", message: " + message);
                         }
 
+                        if (isDuplex() && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType()))
{
+                            try {
+                                // never request b/c they are eventually acked async
+                                remoteBroker.oneway(message);
+                            } finally {
+                                sub.decrementOutstandingResponses();
+                            }
+                            return;
+                        }
+
                         if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
 
                             // The message was not sent using async send, so we should only

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=1442613&r1=1442612&r2=1442613&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
(original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
Tue Feb  5 14:48:31 2013
@@ -19,6 +19,7 @@ package org.apache.activemq.network;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -220,6 +221,7 @@ public class BrokerNetworkWithStuckMessa
         for (int i = 0; i < receiveNumMessages; ++i) {
             Message message1 = receiveMessage(connection2, 20000);
             assertNotNull(message1);
+            LOG.info("on remote, got: " + message1.getMessageId());
             connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
         }
 
@@ -261,6 +263,13 @@ public class BrokerNetworkWithStuckMessa
         connection2.send(connectionInfo2.createRemoveCommand());
 
         // There should now be 5 messages stuck on the remote broker
+        assertTrue("correct stuck message count", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Object[] result = browseQueueWithJmx(remoteBroker);
+                return 5 == result.length;
+            }
+        }));
         messages = browseQueueWithJmx(remoteBroker);
         assertEquals(5, messages.length);
 
@@ -303,6 +312,7 @@ public class BrokerNetworkWithStuckMessa
         int counter = 1;
         for (; counter < receiveNumMessages; counter++) {
             message1 = receiveMessage(connection1);
+            LOG.info("local consume of: " + (message1 != null ? message1.getMessageId() :
" null"));
             connection1.send(createAck(consumerInfo1, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
         }
         // Ensure that 5 messages were received



Mime
View raw message