activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5830 - ensure duplex inbound connection sets network=true flag, fix and test
Date Tue, 09 Jun 2015 11:30:25 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 062b0c2c0 -> 310090904


https://issues.apache.org/jira/browse/AMQ-5830 - ensure duplex inbound connection sets network=true
flag, fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/31009090
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/31009090
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/31009090

Branch: refs/heads/master
Commit: 3100909041b8cf114773f0d0bf60d8032732186f
Parents: 062b0c2
Author: gtully <gary.tully@gmail.com>
Authored: Tue Jun 9 11:29:20 2015 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Tue Jun 9 12:19:49 2015 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   |   7 +-
 .../network/DemandForwardingBridgeSupport.java  |   3 +
 .../activemq/network/NetworkBridgeFactory.java  |  14 --
 .../activemq/transport/vm/VMTransport.java      |   9 -
 .../NetworkBridgeProducerFlowControlTest.java   | 230 ++++++++++++++++++-
 5 files changed, 232 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/31009090/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index a2a04a0..d285c74 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -404,11 +404,7 @@ public class BrokerService implements Service {
      */
     public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception
{
         connector.setBrokerService(this);
-        URI uri = getVmConnectorURI();
-        Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
-        map.put("network", "true");
-        uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
-        connector.setLocalUri(uri);
+        connector.setLocalUri(getVmConnectorURI());
         // Set a connection filter so that the connector does not establish loop
         // back connections.
         connector.setConnectionFilter(new ConnectionFilter() {
@@ -2499,7 +2495,6 @@ public class BrokerService implements Service {
         this.slave = false;
         URI uri = getVmConnectorURI();
         Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
-        map.put("network", "true");
         map.put("async", "false");
         uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/31009090/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 1b77e73..8ba1d98 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -475,6 +475,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge,
Br
                     if (configuration.isDuplex()) {
                         // separate in-bound channel for forwards so we don't
                         // contend with out-bound dispatch on same connection
+                        remoteBrokerInfo.setNetworkConnection(true);
+                        duplexInboundLocalBroker.oneway(remoteBrokerInfo);
+
                         ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
                         duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
                         duplexLocalConnectionInfo.setClientId(configuration.getName() + "_"
+ remoteBrokerName + "_inbound_duplex_"

http://git-wip-us.apache.org/repos/asf/activemq/blob/31009090/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
index 41a9d9e..0e938ae 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
@@ -32,19 +32,6 @@ public final class NetworkBridgeFactory {
 
     private NetworkBridgeFactory() {
     }
-    
-    /**
-     * Create a network bridge
-     * 
-     * @param config
-     * @param localTransport
-     * @param remoteTransport
-     * @return the NetworkBridge
-     */
-    public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration config,
-                                                      Transport localTransport, Transport
remoteTransport) {
-        return createBridge(config, localTransport, remoteTransport, null);
-    }
 
     /**
      * create a network bridge
@@ -74,7 +61,6 @@ public final class NetworkBridgeFactory {
     public static Transport createLocalTransport(Broker broker) throws Exception {
         URI uri = broker.getVmConnectorURI();
         HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
-        map.put("network", "true");
         map.put("async", "true");
         map.put("create", "false"); // we don't want a vm connect during shutdown to trigger
a broker create
         uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));

http://git-wip-us.apache.org/repos/asf/activemq/blob/31009090/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
index 6e6726d..92c9c51 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
@@ -49,7 +49,6 @@ public class VMTransport implements Transport, Task {
     protected VMTransport peer;
     protected TransportListener transportListener;
     protected boolean marshal;
-    protected boolean network;
     protected boolean async = true;
     protected int asyncQueueDepth = 2000;
     protected final URI location;
@@ -358,14 +357,6 @@ public class VMTransport implements Transport, Task {
         this.marshal = marshal;
     }
 
-    public boolean isNetwork() {
-        return network;
-    }
-
-    public void setNetwork(boolean network) {
-        this.network = network;
-    }
-
     @Override
     public String toString() {
         return location + "#" + id;

http://git-wip-us.apache.org/repos/asf/activemq/blob/31009090/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java
index e950b7d..4e1501d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.activemq.usecases;
 
+import java.io.IOException;
 import java.net.URI;
 import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
@@ -31,7 +32,10 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.DiscoveryEvent;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
 import org.apache.activemq.util.MessageIdList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -242,7 +246,7 @@ public class NetworkBridgeProducerFlowControlTest extends
         // Verify the behaviour as described in the description of this class.
         if (networkIsAlwaysSendSync) {
             Assert
-                    .assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 10);
+                    .assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 20);
 
         } else {
             Assert.assertEquals(persistentTestMessages,
@@ -384,4 +388,226 @@ public class NetworkBridgeProducerFlowControlTest extends
         // Verify the behaviour as described in the description of this class.
         Assert.assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 10);
     }
-}
\ No newline at end of file
+
+    public void testSendFailIfNoSpaceReverseDoesNotBlockQueueNetwork() throws Exception {
+        final int NUM_MESSAGES = 100;
+        final long TEST_MESSAGE_SIZE = 1024;
+        final long SLOW_CONSUMER_DELAY_MILLIS = 100;
+
+        final ActiveMQQueue slowDestination = new ActiveMQQueue(
+            NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+                    + ".slow.shared?consumer.prefetchSize=1");
+
+        final ActiveMQQueue fastDestination = new ActiveMQQueue(
+            NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+                    + ".fast.shared?consumer.prefetchSize=1");
+
+
+        // Start a local and a remote broker.
+        BrokerService localBroker = createBroker(new URI("broker:(tcp://localhost:0"
+                + ")?brokerName=broker0&persistent=false&useJmx=true"));
+        createBroker(new URI(
+                "broker:(tcp://localhost:0"
+                        + ")?brokerName=broker1&persistent=false&useJmx=true"));
+        localBroker.getSystemUsage().setSendFailIfNoSpace(true);
+
+        // Set a policy on the local broker that limits the maximum size of the
+        // slow shared queue.
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setMemoryLimit(5 * TEST_MESSAGE_SIZE);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.put(slowDestination, policyEntry);
+        localBroker.setDestinationPolicy(policyMap);
+
+        // Create an outbound bridge from the local broker to the remote broker.
+        // The bridge is configured with the remoteDispatchType enhancement.
+        NetworkConnector nc = bridgeBrokers("broker0", "broker1");
+        nc.setAlwaysSyncSend(true);
+        nc.setPrefetchSize(1);
+        nc.setDuplex(true);
+
+        startAllBrokers();
+        waitForBridgeFormation();
+
+        // Start two asynchronous consumers on the local broker, one for each
+        // of the two shared queues, and keep track of how long it takes for
+        // each of the consumers to receive all the messages.
+        final CountDownLatch fastConsumerLatch = new CountDownLatch(
+                NUM_MESSAGES);
+        final CountDownLatch slowConsumerLatch = new CountDownLatch(
+                NUM_MESSAGES);
+
+        final long startTimeMillis = System.currentTimeMillis();
+        final AtomicLong fastConsumerTime = new AtomicLong();
+        final AtomicLong slowConsumerTime = new AtomicLong();
+
+        Thread fastWaitThread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    fastConsumerLatch.await();
+                    fastConsumerTime.set(System.currentTimeMillis()
+                            - startTimeMillis);
+                } catch (InterruptedException ex) {
+                    exceptions.add(ex);
+                    Assert.fail(ex.getMessage());
+                }
+            }
+        };
+
+        Thread slowWaitThread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    slowConsumerLatch.await();
+                    slowConsumerTime.set(System.currentTimeMillis()
+                            - startTimeMillis);
+                } catch (InterruptedException ex) {
+                    exceptions.add(ex);
+                    Assert.fail(ex.getMessage());
+                }
+            }
+        };
+
+        fastWaitThread.start();
+        slowWaitThread.start();
+
+        createConsumer("broker0", fastDestination, fastConsumerLatch);
+        MessageConsumer slowConsumer = createConsumer("broker0",
+                slowDestination, slowConsumerLatch);
+        MessageIdList messageIdList = brokers.get("broker0").consumers
+                .get(slowConsumer);
+        messageIdList.setProcessingDelay(SLOW_CONSUMER_DELAY_MILLIS);
+
+        // Send the test messages to the local broker's shared queues. The
+        // messages are either persistent or non-persistent to demonstrate the
+        // difference between synchronous and asynchronous dispatch.
+        persistentDelivery = false;
+        sendMessages("broker1", fastDestination, NUM_MESSAGES);
+        sendMessages("broker1", slowDestination, NUM_MESSAGES);
+
+        fastWaitThread.join(TimeUnit.SECONDS.toMillis(60));
+        slowWaitThread.join(TimeUnit.SECONDS.toMillis(60));
+
+        assertTrue("no exceptions on the wait threads:" + exceptions,
+                exceptions.isEmpty());
+
+        LOG.info("Fast consumer duration (ms): " + fastConsumerTime.get());
+        LOG.info("Slow consumer duration (ms): " + slowConsumerTime.get());
+
+        assertTrue("fast time set", fastConsumerTime.get() > 0);
+        assertTrue("slow time set", slowConsumerTime.get() > 0);
+
+        // Verify the behaviour as described in the description of this class.
+        Assert.assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 10);
+    }
+
+
+    /**
+     * create a duplex network bridge from broker0 to broker1
+     * add a topic consumer on broker0
+     * set the setSendFailIfNoSpace() on the local broker.
+     * create a SimpleDiscoveryAgent impl that tracks a network reconnect
+     *
+     * producer connects to broker1 and messages should be sent across the network to broker0
+     *
+     * Ensure broker0 will not send the  javax.jms.ResourceAllocationException (when broker0
runs out of space).
+     * If the javax.jms.ResourceAllocationException is sent across the wire it will force
the network connector
+     * to shutdown
+     *
+     *
+     * @throws Exception
+     */
+
+    public void testDuplexSendFailIfNoSpaceDoesNotBlockNetwork() throws Exception {
+
+        // Consumer prefetch is disabled for broker1's consumers.
+        final ActiveMQTopic destination = new ActiveMQTopic(
+                NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+                        + ".duplexTest?consumer.prefetchSize=1");
+
+        final int NUM_MESSAGES = 100;
+        final long TEST_MESSAGE_SIZE = 1024;
+        final long SLOW_CONSUMER_DELAY_MILLIS = 100;
+
+        // Start a local and a remote broker.
+        BrokerService localBroker = createBroker(new URI("broker:(tcp://localhost:0"
+                + ")?brokerName=broker0&persistent=false&useJmx=true"));
+
+        BrokerService remoteBroker = createBroker(new URI(
+                "broker:(tcp://localhost:0"
+                        + ")?brokerName=broker1&persistent=false&useJmx=true"));
+
+        localBroker.getSystemUsage().setSendFailIfNoSpace(true);
+
+        // Set a policy on the remote broker that limits the maximum size of the
+        // slow shared queue.
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setMemoryLimit(5 * TEST_MESSAGE_SIZE);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.put(destination, policyEntry);
+        localBroker.setDestinationPolicy(policyMap);
+
+        // Create a duplex network bridge from the local broker to the remote broker
+        // create a SimpleDiscoveryAgent impl that tracks a reconnect
+        DiscoveryNetworkConnector discoveryNetworkConnector =  (DiscoveryNetworkConnector)bridgeBrokers("broker0",
"broker1");
+        URI originURI = discoveryNetworkConnector.getUri();
+        discoveryNetworkConnector.setAlwaysSyncSend(true);
+        discoveryNetworkConnector.setPrefetchSize(1);
+        discoveryNetworkConnector.setDuplex(true);
+
+        DummySimpleDiscoveryAgent dummySimpleDiscoveryAgent = new DummySimpleDiscoveryAgent();
+        dummySimpleDiscoveryAgent.setServices(originURI.toString().substring(8,originURI.toString().lastIndexOf(')')));
+
+        discoveryNetworkConnector.setDiscoveryAgent(dummySimpleDiscoveryAgent);
+
+        startAllBrokers();
+        waitForBridgeFormation();
+
+
+        final CountDownLatch consumerLatch = new CountDownLatch(
+                NUM_MESSAGES);
+
+
+        //createConsumer("broker0", fastDestination, fastConsumerLatch);
+
+        MessageConsumer consumer = createConsumer("broker0",
+                destination, consumerLatch);
+
+        MessageIdList messageIdList = brokers.get("broker0").consumers
+                .get(consumer);
+
+        messageIdList.setProcessingDelay(SLOW_CONSUMER_DELAY_MILLIS);
+
+        // Send the test messages to the local broker's shared queues. The
+        // messages are either persistent or non-persistent to demonstrate the
+        // difference between synchronous and asynchronous dispatch.
+        persistentDelivery = false;
+        sendMessages("broker1", destination, NUM_MESSAGES);
+
+        //wait for 5 seconds for the consumer to complete
+        consumerLatch.await(5, TimeUnit.SECONDS);
+
+        assertFalse("dummySimpleDiscoveryAgent.serviceFail has been invoked - should not
have been",
+                dummySimpleDiscoveryAgent.isServiceFailed);
+
+    }
+
+    /**
+     * When the network connector fails it records the failure and delegates to real SimpleDiscoveryAgent
+     */
+    class DummySimpleDiscoveryAgent extends SimpleDiscoveryAgent {
+
+        boolean isServiceFailed = false;
+
+        public void serviceFailed(DiscoveryEvent devent) throws IOException {
+
+            //should never get in here
+            LOG.info("!!!!! DummySimpleDiscoveryAgent.serviceFailed() invoked with event:"+devent+"!!!!!!");
+            isServiceFailed = true;
+            super.serviceFailed(devent);
+
+        }
+
+    }
+}


Mime
View raw message