activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5639 - the duplex case needed work. All advisories were being acked async in duplex mode, that code needed to be more selective to forward advisories that dont terminate at the bridge. Fix an
Date Fri, 26 Jun 2015 13:54:47 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 13c471cc1 -> 002ade79b


https://issues.apache.org/jira/browse/AMQ-5639 - the duplex case needed work. All advisories
were being acked async in duplex mode, that code needed to be more selective to forward advisories
that dont terminate at the bridge. Fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/002ade79
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/002ade79
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/002ade79

Branch: refs/heads/master
Commit: 002ade79b01db228377c24438bae246690f57b7d
Parents: 13c471c
Author: gtully <gary.tully@gmail.com>
Authored: Fri Jun 26 14:54:11 2015 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Fri Jun 26 14:54:29 2015 +0100

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  |  5 +-
 .../activemq/command/NetworkBridgeFilter.java   |  4 +-
 .../usecases/AdvisoryViaNetworkTest.java        | 77 ++++++++++++++++++++
 3 files changed, 81 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/002ade79/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 8ba1d98..8e08f95 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -619,8 +619,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge,
Br
                         LOG.trace("{} duplex command type: {}", configuration.getBrokerName(),
command.getDataStructureType());
                         if (command.isMessage()) {
                             final ActiveMQMessage message = (ActiveMQMessage) command;
-                            if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
-                                    || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination()))
{
+                            if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message))
{
                                 serviceRemoteConsumerAdvisory(message.getDataStructure());
                                 ackAdvisory(message);
                             } else {
@@ -989,7 +988,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge,
Br
                                 configuration.getBrokerName(), remoteBrokerName, (LOG.isTraceEnabled()
? message : message.getMessageId()), md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()),
message
                         });
 
-                        if (isDuplex() && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType()))
{
+                        if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message))
{
                             try {
                                 // never request b/c they are eventually acked async
                                 remoteBroker.oneway(message);

http://git-wip-us.apache.org/repos/asf/activemq/blob/002ade79/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
b/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
index af0c09e..245c098 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
@@ -97,7 +97,7 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression
{
         }
 
         if (message.isAdvisory()) {
-            if (consumerInfo != null && consumerInfo.isNetworkSubscription() &&
advisoryIsInterpretedByNetworkBridge(message)) {
+            if (consumerInfo != null && consumerInfo.isNetworkSubscription() &&
isAdvisoryInterpretedByNetworkBridge(message)) {
                 // they will be interpreted by the bridge leading to dup commands
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("not propagating advisory to network sub: " + consumerInfo.getConsumerId()
+ ", message: "+ message);
@@ -124,7 +124,7 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression
{
         return true;
     }
 
-    private boolean advisoryIsInterpretedByNetworkBridge(Message message) {
+    public static boolean isAdvisoryInterpretedByNetworkBridge(Message message) {
         return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) || AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination());
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/002ade79/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
index aa7d6ee..ab61709 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
@@ -17,10 +17,16 @@
 package org.apache.activemq.usecases;
 
 import java.net.URI;
+import java.util.Arrays;
 import javax.jms.MessageConsumer;
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.virtual.CompositeTopic;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.network.NetworkConnector;
@@ -71,6 +77,33 @@ public class AdvisoryViaNetworkTest extends JmsMultipleBrokersTestSupport
{
     }
 
 
+    public void testAdvisoryForwardingDuplexNC() throws Exception {
+        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Producer.Topic.FOO");
+
+        createBroker("A");
+        createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+        networkBridge.setDuplex(true);
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+
+        MessageConsumer consumerA = createConsumer("A", advisoryTopic);
+        MessageConsumer consumerB = createConsumer("B", advisoryTopic);
+
+        this.sendMessages("A", new ActiveMQTopic("FOO"), 1);
+
+        MessageIdList messagesA = getConsumerMessages("A", consumerA);
+        MessageIdList messagesB = getConsumerMessages("B", consumerB);
+
+        LOG.info("consumerA = " + messagesA);
+        LOG.info("consumerB = " + messagesB);
+
+        messagesA.assertMessagesReceived(2);
+        messagesB.assertMessagesReceived(2);
+    }
+
     public void testBridgeRelevantAdvisoryNotAvailable() throws Exception {
         ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.FOO");
         createBroker("A");
@@ -97,6 +130,50 @@ public class AdvisoryViaNetworkTest extends JmsMultipleBrokersTestSupport
{
         messagesB.assertMessagesReceived(0);
     }
 
+    public void testAdvisoryViaVirtualDest() throws Exception {
+        ActiveMQQueue advisoryQueue = new ActiveMQQueue("advQ");
+        createBroker("A");
+
+        // convert advisories into advQ that cross the network bridge
+        CompositeTopic compositeTopic = new CompositeTopic();
+        compositeTopic.setName("ActiveMQ.Advisory.Connection");
+        compositeTopic.setForwardOnly(false);
+        compositeTopic.setForwardTo(Arrays.asList(advisoryQueue));
+        VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
+        virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{compositeTopic});
+        brokers.get("A").broker.setDestinationInterceptors(new DestinationInterceptor[]{virtualDestinationInterceptor});
+
+        createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.setDuplex(true);
+        networkBridge.setPrefetchSize(1); // so advisories are acked immediately b/c we check
inflight count below
+
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+        verifyPeerBrokerInfo(brokers.get("B"), 1);
+
+        MessageConsumer consumerB = createConsumer("B", advisoryQueue);
+
+        // to make a connection on A
+        createConsumer("A", new ActiveMQTopic("FOO"));
+
+        MessageIdList messagesB = getConsumerMessages("B", consumerB);
+
+        messagesB.waitForMessagesToArrive(2);
+
+        assertTrue("deq and inflight as expected", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                RegionBroker regionBroker = (RegionBroker) brokers.get("A").broker.getRegionBroker();
+                LOG.info("A Deq:" + regionBroker.getDestinationStatistics().getDequeues().getCount());
+                LOG.info("A Inflight:" + regionBroker.getDestinationStatistics().getInflight().getCount());
+                return regionBroker.getDestinationStatistics().getDequeues().getCount() >
2
+                        && regionBroker.getDestinationStatistics().getInflight().getCount()
== 0;
+            }
+        }));
+
+    }
+
     private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) throws Exception
{
         final BrokerService broker = brokerItem.broker;
         final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();


Mime
View raw message