activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-2327 - resolve. key is not to conduit proxy/proxy consumers b/c the dependencies cannot be easily resolved without more network traffic on additions to demand subs. Maintaining the order of consumer a
Date Fri, 06 Sep 2013 21:47:40 GMT
Updated Branches:
  refs/heads/trunk 5515b9be3 -> 6c5732bc5


https://issues.apache.org/jira/browse/AMQ-2327 - resolve. key is not to conduit proxy/proxy
consumers b/c the dependencies cannot be easily resolved without more network traffic on additions
to demand subs. Maintaining the order of consumer advisories fixes duplicate suppression.
thanks for the easymock test. while brittle it did help focus on checkpaths which was key


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6c5732bc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6c5732bc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6c5732bc

Branch: refs/heads/trunk
Commit: 6c5732bc5c8fcac4590590c3016b919f4d068d88
Parents: 5515b9b
Author: gtully <gary.tully@gmail.com>
Authored: Fri Sep 6 22:45:35 2013 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Fri Sep 6 22:46:09 2013 +0100

----------------------------------------------------------------------
 .../activemq/advisory/AdvisoryBroker.java       |  37 +-
 .../apache/activemq/network/ConduitBridge.java  |  30 +-
 .../network/DemandForwardingBridgeSupport.java  |   7 +-
 .../activemq/network/DurableConduitBridge.java  |   2 +-
 .../network/NetworkBridgeConfiguration.java     |   9 +
 activemq-unit-tests/pom.xml                     |   6 +
 .../activemq/network/NetworkRouteTest.java      | 346 +++++++++++++++++++
 .../usecases/ThreeBrokerQueueNetworkTest.java   |  34 +-
 .../VerifyNetworkConsumersDisconnectTest.java   | 144 ++++++--
 9 files changed, 550 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6c5732bc/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index e167c32..9dce5fc 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -16,10 +16,12 @@
  */
 package org.apache.activemq.advisory;
 
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
@@ -52,7 +54,34 @@ public class AdvisoryBroker extends BrokerFilter {
     private static final IdGenerator ID_GENERATOR = new IdGenerator();
 
     protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new
ConcurrentHashMap<ConnectionId, ConnectionInfo>();
-    protected final ConcurrentHashMap<ConsumerId, ConsumerInfo> consumers = new ConcurrentHashMap<ConsumerId,
ConsumerInfo>();
+    class ConsumerIdKey {
+        final ConsumerId delegate;
+        final long creationTime = System.currentTimeMillis();
+        ConsumerIdKey(ConsumerId id) {
+            delegate = id;
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            return delegate.equals(other);
+        }
+
+        @Override
+        public int hashCode() {
+            return delegate.hashCode();
+        }
+    }
+    // replay consumer advisory messages in the order in which they arrive - allows duplicate
suppression in
+    // mesh networks with ttl>1
+    protected final Map<ConsumerIdKey, ConsumerInfo> consumers = new ConcurrentSkipListMap<ConsumerIdKey,
ConsumerInfo>(
+            new Comparator<ConsumerIdKey>() {
+                @Override
+                public int compare(ConsumerIdKey o1, ConsumerIdKey o2) {
+                    return (o1.creationTime < o2.creationTime ? -1 : (o1.delegate==o2.delegate
? 0 : 1));
+                }
+            }
+    );
+
     protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId,
ProducerInfo>();
     protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations
= new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
     protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges =
new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
@@ -84,7 +113,7 @@ public class AdvisoryBroker extends BrokerFilter {
         // Don't advise advisory topics.
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
-            consumers.put(info.getConsumerId(), info);
+            consumers.put(new ConsumerIdKey(info.getConsumerId()), info);
             fireConsumerAdvisory(context, info.getDestination(), topic, info);
         } else {
             // We need to replay all the previously collected state objects
@@ -247,7 +276,7 @@ public class AdvisoryBroker extends BrokerFilter {
         ActiveMQDestination dest = info.getDestination();
         if (!AdvisorySupport.isAdvisoryTopic(dest)) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
-            consumers.remove(info.getConsumerId());
+            consumers.remove(new ConsumerIdKey(info.getConsumerId()));
             if (!dest.isTemporary() || destinations.containsKey(dest)) {
                 fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
             }
@@ -575,7 +604,7 @@ public class AdvisoryBroker extends BrokerFilter {
         return connections;
     }
 
-    public Map<ConsumerId, ConsumerInfo> getAdvisoryConsumers() {
+    public Map<ConsumerIdKey, ConsumerInfo> getAdvisoryConsumers() {
         return consumers;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c5732bc/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index 683c91e..2d1904c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -57,38 +57,24 @@ public class ConduitBridge extends DemandForwardingBridge {
         return doCreateDemandSubscription(info);
     }
 
-    protected boolean checkPaths(BrokerId[] first, BrokerId[] second) {
-        if (first == null || second == null) {
-            return true;
-        }
-        if (Arrays.equals(first, second)) {
-            return true;
-        }
-
-        if (first[0].equals(second[0]) && first[first.length - 1].equals(second[second.length
- 1])) {
-            return false;
-        } else {
-            return true;
-        }
-    }
-
     protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
         // search through existing subscriptions and see if we have a match
+        if (info.isNetworkSubscription()) {
+            return false;
+        }
         boolean matched = false;
 
         for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
             DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
-            if (filter.matches(info.getDestination())) {
+            if (!ds.getRemoteInfo().isNetworkSubscription() && filter.matches(info.getDestination()))
{
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(configuration.getBrokerName() + " " + info + " with ids" +
info.getNetworkConsumerIds() + " matched (add interest) " + ds);
                 }
                 // add the interest in the subscription
-                if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath()))
{
-                    if (!info.isDurable()) {
-                        ds.add(info.getConsumerId());
-                    } else {
-                       ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(),
info.getSubscriptionName()));
-                    }
+                if (!info.isDurable()) {
+                    ds.add(info.getConsumerId());
+                } else {
+                    ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(),
info.getSubscriptionName()));
                 }
                 matched = true;
                 // continue - we want interest to any existing DemandSubscriptions

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c5732bc/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 5fbca0a..56da994 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -75,7 +75,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge,
Br
     protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
     protected final Transport localBroker;
     protected final Transport remoteBroker;
-    protected final IdGenerator idGenerator = new IdGenerator();
+    protected IdGenerator idGenerator;
     protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
     protected ConnectionInfo localConnectionInfo;
     protected ConnectionInfo remoteConnectionInfo;
@@ -354,6 +354,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge,
Br
             // Fill in the remote broker's information now.
             remoteBrokerPath[0] = remoteBrokerId;
             remoteBrokerName = remoteBrokerInfo.getBrokerName();
+            if (configuration.isUseBrokerNamesAsIdSeed()) {
+                idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName);
+            } else {
+                idGenerator = new IdGenerator();
+            }
         } catch (Throwable e) {
             serviceLocalException(e);
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c5732bc/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index a531906..7001a8d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -33,7 +33,7 @@ public class DurableConduitBridge extends ConduitBridge {
     private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class);
 
     public String toString() {
-        return "DurableConduitBridge";
+        return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName();
     }
     /**
      * Constructor

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c5732bc/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index ac90828..808754f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -61,6 +61,7 @@ public class NetworkBridgeConfiguration {
     private boolean staticBridge = false;
     private boolean useCompression = false;
     private boolean advisoryForFailedForward = false;
+    private boolean useBrokerNamesAsIdSeed = true;
 
     /**
      * @return the conduitSubscriptions
@@ -415,4 +416,12 @@ public class NetworkBridgeConfiguration {
     public int getMessageTTL() {
         return messageTTL;
     }
+
+    public boolean isUseBrokerNamesAsIdSeed() {
+        return useBrokerNamesAsIdSeed;
+    }
+
+    public void setUseBrokerNameAsIdSees(boolean val) {
+        useBrokerNamesAsIdSeed = val;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c5732bc/activemq-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index b6ed174..6e1c5b9 100755
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -289,6 +289,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <version>3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.ftpserver</groupId>
       <artifactId>ftpserver-core</artifactId>
       <version>${ftpserver-version}</version>

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c5732bc/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkRouteTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkRouteTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkRouteTest.java
new file mode 100644
index 0000000..4cd97b0
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkRouteTest.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IMocksControl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NetworkRouteTest {
+    private IMocksControl control;
+    private BrokerService brokerService;
+    private Transport localBroker;
+    private Transport remoteBroker;
+    private TransportListener localListener;
+    private TransportListener remoteListener;
+    private MessageDispatch msgDispatch;
+    private ActiveMQMessage path1Msg;
+    private ActiveMQMessage path2Msg;
+    private ActiveMQMessage removePath1Msg;
+    private ActiveMQMessage removePath2Msg;
+
+    // this sort of mockery is very brittle but it is fast!
+
+    @Test
+    public void verifyNoRemoveOnOneConduitRemove() throws Exception {
+        EasyMock.expect(localBroker.request(EasyMock.isA(ConsumerInfo.class))).andReturn(null);
+        control.replay();
+
+        remoteListener.onCommand(path2Msg);
+        remoteListener.onCommand(path1Msg);
+
+        remoteListener.onCommand(removePath2Msg);
+        control.verify();
+    }
+
+    @Test
+    public void addAndRemoveOppositeOrder() throws Exception {
+        // from (1)
+        localBroker.request(EasyMock.isA(ConsumerInfo.class));
+        ArgHolder localConsumer = ArgHolder.holdArgsForLastObjectCall();
+        // from (2a)
+        remoteBroker.asyncRequest(EasyMock.isA(ActiveMQMessage.class), EasyMock.isA(ResponseCallback.class));
+        ArgHolder firstMessageFuture = ArgHolder.holdArgsForLastFutureRequestCall();
+        localBroker.oneway(EasyMock.isA(MessageAck.class));
+        // from (2b)
+        remoteBroker.asyncRequest(EasyMock.isA(ActiveMQMessage.class), EasyMock.isA(ResponseCallback.class));
+        ArgHolder secondMessageFuture = ArgHolder.holdArgsForLastFutureRequestCall();
+
+        localBroker.oneway(EasyMock.isA(MessageAck.class));
+        // from (3)
+        localBroker.oneway(EasyMock.isA(RemoveInfo.class));
+        ExpectationWaiter waitForRemove = ExpectationWaiter.waiterForLastVoidCall();
+        control.replay();
+
+        // (1) send advisory of path 1
+        remoteListener.onCommand(path1Msg);
+        msgDispatch.setConsumerId(((ConsumerInfo) localConsumer.arguments[0]).getConsumerId());
+        // send advisory of path 2, doesn't send a ConsumerInfo to localBroker
+        remoteListener.onCommand(path2Msg);
+        // (2a) send a message
+        localListener.onCommand(msgDispatch);
+        ResponseCallback callback = (ResponseCallback) firstMessageFuture.arguments[1];
+        FutureResponse response = new FutureResponse(callback);
+        response.set(new Response());
+
+        // send advisory of path 2 remove, doesn't send a RemoveInfo to localBroker
+        remoteListener.onCommand(removePath2Msg);
+        // (2b) send a message
+        localListener.onCommand(msgDispatch);
+        callback = (ResponseCallback) secondMessageFuture.arguments[1];
+        response = new FutureResponse(callback);
+        response.set(new Response());
+
+        // (3) send advisory of path 1 remove, sends a RemoveInfo to localBroker
+        remoteListener.onCommand(removePath1Msg);
+        waitForRemove.assertHappens(5, TimeUnit.SECONDS);
+        // send a message, does not send message as in 2a and 2b
+        localListener.onCommand(msgDispatch);
+
+        control.verify();
+    }
+
+    @Test
+    public void addAndRemoveSameOrder() throws Exception {
+        // from (1)
+        localBroker.request(EasyMock.isA(ConsumerInfo.class));
+        ArgHolder localConsumer = ArgHolder.holdArgsForLastObjectCall();
+
+        // from (2a)
+        remoteBroker.asyncRequest(EasyMock.isA(ActiveMQMessage.class), EasyMock.isA(ResponseCallback.class));
+        ArgHolder firstMessageFuture = ArgHolder.holdArgsForLastFutureRequestCall();
+
+        localBroker.oneway(EasyMock.isA(MessageAck.class));
+
+        // from (2b)
+        remoteBroker.asyncRequest(EasyMock.isA(ActiveMQMessage.class), EasyMock.isA(ResponseCallback.class));
+        ArgHolder secondMessageFuture = ArgHolder.holdArgsForLastFutureRequestCall();
+
+        localBroker.oneway(EasyMock.isA(MessageAck.class));
+
+        // from (3)
+        localBroker.oneway(EasyMock.isA(RemoveInfo.class));
+        ExpectationWaiter waitForRemove = ExpectationWaiter.waiterForLastVoidCall();
+        control.replay();
+
+        // (1) send advisory of path 1
+        remoteListener.onCommand(path1Msg);
+        msgDispatch.setConsumerId(((ConsumerInfo) localConsumer.arguments[0]).getConsumerId());
+        // send advisory of path 2, doesn't send a ConsumerInfo to localBroker
+        remoteListener.onCommand(path2Msg);
+        // (2a) send a message
+        localListener.onCommand(msgDispatch);
+        ResponseCallback callback = (ResponseCallback) firstMessageFuture.arguments[1];
+        FutureResponse response = new FutureResponse(callback);
+        response.set(new Response());
+
+        // send advisory of path 1 remove, shouldn't send a RemoveInfo to localBroker
+        remoteListener.onCommand(removePath1Msg);
+        // (2b) send a message, should send the message as in 2a
+        localListener.onCommand(msgDispatch);
+        callback = (ResponseCallback) secondMessageFuture.arguments[1];
+        response = new FutureResponse(callback);
+        response.set(new Response());
+
+        // (3) send advisory of path 1 remove, should send a RemoveInfo to localBroker
+        remoteListener.onCommand(removePath2Msg);
+        waitForRemove.assertHappens(5, TimeUnit.SECONDS);
+        // send a message, does not send message as in 2a
+        localListener.onCommand(msgDispatch);
+
+        control.verify();
+    }
+
+    @Before
+    public void before() throws Exception {
+        control = EasyMock.createControl();
+        localBroker = control.createMock(Transport.class);
+        remoteBroker = control.createMock(Transport.class);
+
+        NetworkBridgeConfiguration configuration = new NetworkBridgeConfiguration();
+        brokerService = new BrokerService();
+        BrokerInfo remoteBrokerInfo = new BrokerInfo();
+
+        configuration.setDuplex(true);
+        configuration.setNetworkTTL(5);
+        brokerService.setBrokerId("broker-1");
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+        remoteBrokerInfo.setBrokerId(new BrokerId("remote-broker-id"));
+        remoteBrokerInfo.setBrokerName("remote-broker-name");
+
+        localBroker.setTransportListener(EasyMock.isA(TransportListener.class));
+        ArgHolder localListenerRef = ArgHolder.holdArgsForLastVoidCall();
+
+        remoteBroker.setTransportListener(EasyMock.isA(TransportListener.class));
+        ArgHolder remoteListenerRef = ArgHolder.holdArgsForLastVoidCall();
+        localBroker.start();
+        remoteBroker.start();
+
+        remoteBroker.oneway(EasyMock.isA(Object.class));
+        EasyMock.expectLastCall().times(4);
+        remoteBroker.oneway(EasyMock.isA(Object.class));
+        ExpectationWaiter remoteInitWaiter = ExpectationWaiter.waiterForLastVoidCall();
+
+        localBroker.oneway(remoteBrokerInfo);
+        EasyMock.expect(localBroker.request(EasyMock.isA(Object.class)))
+                .andReturn(null);
+        localBroker.oneway(EasyMock.isA(Object.class));
+        ExpectationWaiter localInitWaiter = ExpectationWaiter.waiterForLastVoidCall();
+
+        control.replay();
+
+        DurableConduitBridge bridge = new DurableConduitBridge(configuration, localBroker,
remoteBroker);
+        bridge.setBrokerService(brokerService);
+        bridge.start();
+
+        localListener = (TransportListener) localListenerRef.getArguments()[0];
+        Assert.assertNotNull(localListener);
+        remoteListener = (TransportListener) remoteListenerRef.getArguments()[0];
+        Assert.assertNotNull(remoteListener);
+
+        remoteListener.onCommand(remoteBrokerInfo);
+
+        remoteInitWaiter.assertHappens(5, TimeUnit.SECONDS);
+        localInitWaiter.assertHappens(5, TimeUnit.SECONDS);
+
+        control.verify();
+        control.reset();
+
+        ActiveMQMessage msg = new ActiveMQMessage();
+        msg.setDestination(new ActiveMQTopic("test"));
+        msgDispatch = new MessageDispatch();
+        msgDispatch.setMessage(msg);
+
+        ConsumerInfo path1 = new ConsumerInfo();
+        path1.setDestination(msg.getDestination());
+        path1.setConsumerId(new ConsumerId(new SessionId(new ConnectionId("conn-id-1"), 1),
3));
+        path1.setBrokerPath(new BrokerId[]{
+                new BrokerId("remote-broker-id"),
+                new BrokerId("server(1)-broker-id"),
+        });
+        path1Msg = new ActiveMQMessage();
+        path1Msg.setDestination(AdvisorySupport.getConsumerAdvisoryTopic(path1.getDestination()));
+        path1Msg.setDataStructure(path1);
+
+        ConsumerInfo path2 = new ConsumerInfo();
+        path2.setDestination(path1.getDestination());
+        path2.setConsumerId(new ConsumerId(new SessionId(new ConnectionId("conn-id-2"), 2),
4));
+        path2.setBrokerPath(new BrokerId[]{
+                new BrokerId("remote-broker-id"),
+                new BrokerId("server(2)-broker-id"),
+                new BrokerId("server(1)-broker-id"),
+        });
+        path2Msg = new ActiveMQMessage();
+        path2Msg.setDestination(path1Msg.getDestination());
+        path2Msg.setDataStructure(path2);
+
+        RemoveInfo removePath1 = new RemoveInfo(path1.getConsumerId());
+        RemoveInfo removePath2 = new RemoveInfo(path2.getConsumerId());
+
+        removePath1Msg = new ActiveMQMessage();
+        removePath1Msg.setDestination(path1Msg.getDestination());
+        removePath1Msg.setDataStructure(removePath1);
+
+        removePath2Msg = new ActiveMQMessage();
+        removePath2Msg.setDestination(path1Msg.getDestination());
+        removePath2Msg.setDataStructure(removePath2);
+    }
+
+    @After
+    public void after() throws Exception {
+        control.reset();
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    private static class ArgHolder {
+        public Object[] arguments;
+
+        public static ArgHolder holdArgsForLastVoidCall() {
+            final ArgHolder holder = new ArgHolder();
+            EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+                @Override
+                public Object answer() throws Throwable {
+                    Object[] args = EasyMock.getCurrentArguments();
+                    holder.arguments = Arrays.copyOf(args, args.length);
+                    return null;
+                }
+            });
+            return holder;
+        }
+
+        public static ArgHolder holdArgsForLastObjectCall() {
+            final ArgHolder holder = new ArgHolder();
+            EasyMock.expect(new Object()).andAnswer(new IAnswer<Object>() {
+                @Override
+                public Object answer() throws Throwable {
+                    Object[] args = EasyMock.getCurrentArguments();
+                    holder.arguments = Arrays.copyOf(args, args.length);
+                    return null;
+                }
+            });
+            return holder;
+        }
+
+        public static ArgHolder holdArgsForLastFutureRequestCall() {
+            final ArgHolder holder = new ArgHolder();
+            EasyMock.expect(new FutureResponse(null)).andAnswer(new IAnswer<FutureResponse>()
{
+                @Override
+                public FutureResponse answer() throws Throwable {
+                    Object[] args = EasyMock.getCurrentArguments();
+                    holder.arguments = Arrays.copyOf(args, args.length);
+                    return null;
+                }
+            });
+
+            return holder;
+        }
+
+        public Object[] getArguments() {
+            Assert.assertNotNull(arguments);
+            return arguments;
+        }
+    }
+
+    private static class ExpectationWaiter {
+        private CountDownLatch latch = new CountDownLatch(1);
+
+        public static ExpectationWaiter waiterForLastVoidCall() {
+            final ExpectationWaiter waiter = new ExpectationWaiter();
+            EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+                @Override
+                public Object answer() throws Throwable {
+                    waiter.latch.countDown();
+                    return null;
+                }
+            });
+            return waiter;
+        }
+
+        public void assertHappens(long timeout, TimeUnit unit) throws InterruptedException
{
+            Assert.assertTrue(latch.await(timeout, unit));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c5732bc/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
index c3ccf08..2e91d4c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
@@ -571,8 +571,8 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport
{
 
 
     public void testDuplicateQueueSubs() throws Exception {
-    	
-    	createBroker("BrokerD");
+
+        configureBroker(createBroker("BrokerD"));
         
         bridgeAllBrokers("default", 3, false);
         startAllBrokers();
@@ -596,7 +596,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport
{
         for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
             BrokerService broker = i.next().broker;
             if (!brokerName.equals(broker.getBrokerName())) {
-                verifyConsumerCount(broker, 3, dest);
+                verifyConsumerCount(broker, 5, dest);
                 verifyConsumePriority(broker, ConsumerInfo.NORMAL_PRIORITY, dest);
             }
         }
@@ -605,7 +605,14 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport
{
         
         // wait for advisories
         Thread.sleep(2000);
-        
+
+        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
+            BrokerService broker = i.next().broker;
+            if (!brokerName.equals(broker.getBrokerName())) {
+                logConsumerCount(broker, 0, dest);
+            }
+        }
+
         for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
             BrokerService broker = i.next().broker;
             verifyConsumerCount(broker, 0, dest);
@@ -620,9 +627,21 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport
{
             }
         });
         Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
+        LOG.info("Verify: consumer count on " + broker.getBrokerName() + " matches:" + count
+ ", val:" + internalQueue.getConsumers().size());
         assertEquals("consumer count on " + broker.getBrokerName() + " matches for q: " +
internalQueue, count, internalQueue.getConsumers().size());      
     }
 
+    private void logConsumerCount(BrokerService broker, int count, final Destination dest)
throws Exception {
+        final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
+        waitFor(new Condition() {
+            public boolean isSatisified() throws Exception {
+                return !regionBroker.getDestinations(ActiveMQDestination.transform(dest)).isEmpty();
+            }
+        });
+        Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
+        LOG.info("Verify: consumer count on " + broker.getBrokerName() + " matches:" + count
+ ", val:" + internalQueue.getConsumers().size());
+    }
+
     private void verifyConsumePriority(BrokerService broker, byte expectedPriority, Destination
dest) throws Exception {
         RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
         Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
@@ -630,7 +649,12 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport
{
             assertEquals("consumer on " + broker.getBrokerName() + " matches priority: "
+ internalQueue, expectedPriority, consumer.getConsumerInfo().getPriority());      
         }
     }
-    
+
+    @Override
+    public void configureBroker(BrokerService brokerService) {
+        brokerService.setBrokerId(brokerService.getBrokerName());
+    }
+
     @Override
     public void setUp() throws Exception {
         super.setAutoFail(true);

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c5732bc/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
index 3ebc4b1..9eeb28c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
@@ -18,14 +18,16 @@ package org.apache.activemq.usecases;
 
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.net.URI;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.Destination;
 import javax.jms.MessageConsumer;
+import javax.management.ObjectName;
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.ManagementContext;
@@ -39,26 +41,24 @@ import org.slf4j.LoggerFactory;
 
 public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTestSupport implements
UncaughtExceptionHandler {
     public static final int BROKER_COUNT = 3;
-    public static final int CONSUMER_COUNT = 1;
+    public static final int CONSUMER_COUNT = 5;
     public static final int MESSAGE_COUNT = 0;
     public static final boolean DUPLEX = false;
     public static final boolean CONDUIT = true;
 
-    // NETWORK_TTL=4 is problematic for consumer/demand propagation
-    // needs setConsumerTTL=1 to override
-    public static final int NETWORK_TTL = 4;
+    public static final int NETWORK_TTL = 6;
     private static final Logger LOG = LoggerFactory.getLogger(VerifyNetworkConsumersDisconnectTest.class);
     public static final int TIMEOUT = 30000;
 
     protected Map<String, MessageConsumer> consumerMap;
-    Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
+    Map<Thread, Throwable> unhandledExceptions = new HashMap<Thread, Throwable>();
 
-    private void assertNoUnhandeledExceptions() {
-        for( Entry<Thread, Throwable> e: unhandeledExceptions.entrySet()) {
+    private void assertNoUnhandledExceptions() {
+        for( Entry<Thread, Throwable> e: unhandledExceptions.entrySet()) {
             LOG.error("Thread:" + e.getKey() + " Had unexpected: " + e.getValue());
         }
-        assertTrue("There are no unhandelled exceptions, see: log for detail on: " + unhandeledExceptions,
-                unhandeledExceptions.isEmpty());
+        assertTrue("There are no unhandled exceptions, see: log for detail on: " + unhandledExceptions,
+                unhandledExceptions.isEmpty());
     }
 
     public NetworkConnector bridge(String from, String to) throws Exception {
@@ -66,14 +66,17 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
         networkConnector.setSuppressDuplicateQueueSubscriptions(true);
         networkConnector.setDecreaseNetworkConsumerPriority(true);
         networkConnector.setDuplex(DUPLEX);
-        // infinite ttl for messages in a mesh
-        networkConnector.setMessageTTL(-1);
-        // one hop for consumers in a mesh
-        networkConnector.setConsumerTTL(1);
         return networkConnector;
     }
 
-    public void testQueueAllConnected() throws Exception {
+    /*why conduit proxy proxy consumers gets us in a knot w.r.t removal
+    DC-7 for CA-9, add DB-15, remove CA-9, add CB-8
+    CB-8 add DC-7
+    CB-8 - why not dead?
+    CB-8 for BA-6, add BD-15, remove BA-6
+    BD-15 for DA-11, add DC-7
+    */
+    public void testConsumerOnEachBroker() throws Exception {
         bridge("Broker0", "Broker1");
         if (!DUPLEX) bridge("Broker1", "Broker0");
 
@@ -81,7 +84,10 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
         if (!DUPLEX) bridge("Broker2", "Broker1");
 
         startAllBrokers();
-        this.waitForBridgeFormation();
+        waitForBridgeFormation(brokers.get("Broker0").broker, 1, 0);
+        waitForBridgeFormation(brokers.get("Broker2").broker, 1, 0);
+        waitForBridgeFormation(brokers.get("Broker1").broker, 1, 0);
+        waitForBridgeFormation(brokers.get("Broker1").broker, 1, 1);
 
         Destination dest = createDestination("TEST.FOO", false);
 
@@ -90,14 +96,14 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
             consumerMap.put("Consumer:" + i + ":0", createConsumer("Broker" + i, dest));
         }
 
-        assertExactConsumersConnect("Broker0", dest, 2, TIMEOUT);
-        assertExactConsumersConnect("Broker2", dest, 2, TIMEOUT);
+        assertExactConsumersConnect("Broker0", 3, 1, TIMEOUT);
+        assertExactConsumersConnect("Broker2", 3, 1, TIMEOUT);
         // piggy in the middle
-        assertExactConsumersConnect("Broker1", dest, 3, TIMEOUT);
+        assertExactConsumersConnect("Broker1", 3, 1, TIMEOUT);
 
-        assertNoUnhandeledExceptions();
+        assertNoUnhandledExceptions();
 
-        LOG.info("Complate the mesh - 0->2");
+        LOG.info("Complete the mesh - 0->2");
 
         // shorter route
         NetworkConnector nc = bridge("Broker0", "Broker2");
@@ -106,7 +112,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
 
 
         if (!DUPLEX) {
-            LOG.info("... complate the mesh - 2->0");
+            LOG.info("... complete the mesh - 2->0");
             nc = bridge("Broker2", "Broker0");
             nc.setBrokerName("Broker2");
             nc.start();
@@ -114,7 +120,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
 
         // wait for consumers to get propagated
         for (int i = 0; i < BROKER_COUNT; i++) {
-        	assertExactConsumersConnect("Broker" + i, dest, 3, TIMEOUT);
+        	assertExactConsumersConnect("Broker" + i, 3, 1, TIMEOUT);
         }
 
         // reverse order close
@@ -126,24 +132,98 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
 
         LOG.info("Check for no consumers..");
         for (int i = 0; i < BROKER_COUNT; i++) {
-        	assertExactConsumersConnect("Broker" + i, dest, 0, TIMEOUT);
+        	assertExactConsumersConnect("Broker" + i, 0, 0, TIMEOUT);
         }
 
     }
 
-    protected void assertExactConsumersConnect(final String brokerName, Destination destination,
final int count, long timeout) throws Exception {
+    public void testXConsumerOnEachBroker() throws Exception {
+        bridge("Broker0", "Broker1");
+        if (!DUPLEX) bridge("Broker1", "Broker0");
+
+        bridge("Broker1", "Broker2");
+        if (!DUPLEX) bridge("Broker2", "Broker1");
+
+        startAllBrokers();
+
+        waitForBridgeFormation(brokers.get("Broker0").broker, 1, 0);
+        waitForBridgeFormation(brokers.get("Broker2").broker, 1, 0);
+        waitForBridgeFormation(brokers.get("Broker1").broker, 1, 0);
+        waitForBridgeFormation(brokers.get("Broker1").broker, 1, 1);
+
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        for (int i = 0; i < BROKER_COUNT; i++) {
+            for (int j=0; j< CONSUMER_COUNT; j++)
+            consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest));
+        }
+
+        for (int i = 0; i < BROKER_COUNT; i++) {
+            assertExactConsumersConnect("Broker" + i, CONSUMER_COUNT + (BROKER_COUNT -1),
1, TIMEOUT);
+        }
+
+        assertNoUnhandledExceptions();
+
+        LOG.info("Complete the mesh - 0->2");
+
+        // shorter route
+        NetworkConnector nc = bridge("Broker0", "Broker2");
+        nc.setBrokerName("Broker0");
+        nc.start();
+
+        waitForBridgeFormation(brokers.get("Broker0").broker, 1, 1);
+
+        if (!DUPLEX) {
+            LOG.info("... complete the mesh - 2->0");
+            nc = bridge("Broker2", "Broker0");
+            nc.setBrokerName("Broker2");
+            nc.start();
+        }
+
+        waitForBridgeFormation(brokers.get("Broker2").broker, 1, 1);
+
+        for (int i = 0; i < BROKER_COUNT; i++) {
+            assertExactConsumersConnect("Broker" + i, CONSUMER_COUNT + (BROKER_COUNT -1),
1, TIMEOUT);
+        }
+
+        // reverse order close
+        for (int i=0; i<CONSUMER_COUNT; i++) {
+            consumerMap.get("Consumer:" + 2 + ":" + i).close();
+            TimeUnit.SECONDS.sleep(1);
+            consumerMap.get("Consumer:" + 1 + ":" + i).close();
+            TimeUnit.SECONDS.sleep(1);
+            consumerMap.get("Consumer:" + 0 + ":" + i).close();
+        }
+
+        LOG.info("Check for no consumers..");
+        for (int i = 0; i < BROKER_COUNT; i++) {
+        	assertExactConsumersConnect("Broker" + i, 0, 0, TIMEOUT);
+        }
+
+    }
+
+    protected void assertExactConsumersConnect(final String brokerName, final int count,
final int numChecks, long timeout) throws Exception {
         final ManagementContext context = brokers.get(brokerName).broker.getManagementContext();
-        assertTrue("Excepected consumers count: " + count + " on: " + brokerName, Wait.waitFor(new
Wait.Condition() {
+        final AtomicInteger stability = new AtomicInteger(0);
+        assertTrue("Expected consumers count: " + count + " on: " + brokerName, Wait.waitFor(new
Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
                 try {
                     QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0],
QueueViewMBean.class, false);
                     long currentCount = queueViewMBean.getConsumerCount();
                     LOG.info("On " + brokerName + " current consumer count for " + queueViewMBean
+ ", " + currentCount);
-                    if (count != currentCount) {
-                        LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions()));
+                    LinkedList<String> consumerIds = new LinkedList<String>();
+                    for (ObjectName objectName : queueViewMBean.getSubscriptions()) {
+                        consumerIds.add(objectName.getKeyProperty("consumerId"));
+                    }
+                    LOG.info("Sub IDs: " + consumerIds);
+                    if (currentCount == count) {
+                        stability.incrementAndGet();
+                    } else {
+                        stability.set(0);
                     }
-                    return currentCount == count;
+                    return stability.get() > numChecks;
                 } catch (Exception e) {
                     LOG.warn(": ", e);
                     return false;
@@ -156,7 +236,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
         super.setAutoFail(true);
         super.setUp();
 
-        unhandeledExceptions.clear();
+        unhandledExceptions.clear();
         Thread.setDefaultUncaughtExceptionHandler(this);
         
         // Setup n brokers
@@ -177,8 +257,8 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
     }
 
     public void uncaughtException(Thread t, Throwable e) {
-        synchronized(unhandeledExceptions) {
-            unhandeledExceptions.put(t,e);
+        synchronized(unhandledExceptions) {
+            unhandledExceptions.put(t, e);
         }
     }
 }


Mime
View raw message