activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1142005 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/broker/region/ src/main/java/org/apache/activemq/broker/region/policy/ src/main/java/org/apache/activemq/command/ src/main/java/org/apache/activemq/filter/ sr...
Date Fri, 01 Jul 2011 17:45:54 GMT
Author: gtully
Date: Fri Jul  1 17:45:54 2011
New Revision: 1142005

URL: http://svn.apache.org/viewvc?rev=1142005&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-2484,https://issues.apache.org/jira/browse/AMQ-2324: Forwarded message cannot be distributed to the original broker, Avoid stuck messages in a network of brokers - implement destination policy that allows the network filter factory to be provided. A new implementation will allow queues to replay messages with a configurable delay once there are no local consumers. Also it allows a rate limit on the network consumer such that a statically included dest does not consume all messages if there is a slow local consumer

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java   (with props)
Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkRestartTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=1142005&r1=1142004&r2=1142005&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Fri Jul  1 17:45:54 2011
@@ -451,8 +451,6 @@
             <exclude>**/MissingDataFileTest.*</exclude>
 
             <!-- m2 tests failing since move from assembly  -->
-            <exclude>**/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.*</exclude>
-            <exclude>**/TwoBrokerQueueClientsReconnectTest.*</exclude>
             <exclude>**/QueueConsumerCloseAndReconnectTest.*</exclude>
             <exclude>**/TwoBrokerMulticastQueueTest.*</exclude>
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1142005&r1=1142004&r2=1142005&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Jul  1 17:45:54 2011
@@ -1827,29 +1827,26 @@ public class Queue extends BaseDestinati
                     interestCount++;
                     continue;
                 }
-                if (dispatchSelector.canSelect(s, node)) {
-                    if (!fullConsumers.contains(s)) {
-                        if (!s.isFull()) {
-                            if (assignMessageGroup(s, (QueueMessageReference)node)) {
-                                // Dispatch it.
-                                s.add(node);
-                                target = s;
-                                break;
-                            }
-                        } else {
-                            // no further dispatch of list to a full consumer to
-                            // avoid out of order message receipt
-                            fullConsumers.add(s);
-                        }
+                if (!fullConsumers.contains(s) && !s.isFull()) {
+                    if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node)) {
+                        // Dispatch it.
+                        s.add(node);
+                        target = s;
+                        break;
                     }
-                    interestCount++;
                 } else {
-                    // makes sure it gets dispatched again
-                    if (!node.isDropped() && !((QueueMessageReference) node).isAcked()
-                            && (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
-                        interestCount++;
+                    // no further dispatch of list to a full consumer to
+                    // avoid out of order message receipt
+                    fullConsumers.add(s);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Sub full " + s);
                     }
                 }
+                // make sure it gets dispatched again
+                if (!node.isDropped() && !((QueueMessageReference) node).isAcked() &&
+                        (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
+                    interestCount++;
+                }
             }
 
             if ((target == null && interestCount > 0) || consumers.size() == 0) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1142005&r1=1142004&r2=1142005&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Fri Jul  1 17:45:54 2011
@@ -30,6 +30,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
 import org.apache.activemq.filter.DestinationMapEntry;
+import org.apache.activemq.network.NetworkBridgeFilterFactory;
 import org.apache.activemq.usage.SystemUsage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,6 +94,7 @@ public class PolicyEntry extends Destina
     private boolean gcWithNetworkConsumers;
     private long inactiveTimoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
     private boolean reduceMemoryFootprint;
+    private NetworkBridgeFilterFactory networkBridgeFilterFactory;
 
 
     public void configure(Broker broker,Queue queue) {
@@ -805,4 +807,12 @@ public class PolicyEntry extends Destina
     public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
         this.reduceMemoryFootprint = reduceMemoryFootprint;
     }
+
+    public void setNetworkBridgeFilterFactory(NetworkBridgeFilterFactory networkBridgeFilterFactory) {
+        this.networkBridgeFilterFactory = networkBridgeFilterFactory;
+    }
+
+    public NetworkBridgeFilterFactory getNetworkBridgeFilterFactory() {
+        return networkBridgeFilterFactory;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java?rev=1142005&r1=1142004&r2=1142005&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java Fri Jul  1 17:45:54 2011
@@ -36,14 +36,14 @@ public class NetworkBridgeFilter impleme
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
     static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class);
 
-    private BrokerId networkBrokerId;
-    private int networkTTL;
+    protected BrokerId networkBrokerId;
+    protected int networkTTL;
 
     public NetworkBridgeFilter() {
     }
 
-    public NetworkBridgeFilter(BrokerId remoteBrokerPath, int networkTTL) {
-        this.networkBrokerId = remoteBrokerPath;
+    public NetworkBridgeFilter(BrokerId networkBrokerId, int networkTTL) {
+        this.networkBrokerId = networkBrokerId;
         this.networkTTL = networkTTL;
     }
 
@@ -62,7 +62,7 @@ public class NetworkBridgeFilter impleme
             // in the dispatch loop
             // so need to get the reference to it
             Message message = mec.getMessage();
-            return message != null && matchesForwardingFilter(message);
+            return message != null && matchesForwardingFilter(message, mec);
         } catch (IOException e) {
             throw JMSExceptionSupport.create(e);
         }
@@ -72,11 +72,11 @@ public class NetworkBridgeFilter impleme
         return matches(message) ? Boolean.TRUE : Boolean.FALSE;
     }
 
-    protected boolean matchesForwardingFilter(Message message) {
+    protected boolean matchesForwardingFilter(Message message, MessageEvaluationContext mec) {
 
         if (contains(message.getBrokerPath(), networkBrokerId)) {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Message all ready routed once through this broker ("
+                LOG.trace("Message all ready routed once through target broker ("
                         + networkBrokerId + "), path: "
                         + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
             }
@@ -92,7 +92,6 @@ public class NetworkBridgeFilter impleme
             return false;
         }
 
-        // Don't propagate advisory messages about network subscriptions
         if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
             ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
             hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
@@ -102,6 +101,12 @@ public class NetworkBridgeFilter impleme
                 }
                 return false;
             }
+
+            if (contains(info.getBrokerPath(), networkBrokerId)) {
+                LOG.trace("ConsumerInfo advisory all ready routed once through target broker ("
+                        + networkBrokerId + "), path: "
+                        + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message);
+            }
         }
         return true;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java?rev=1142005&r1=1142004&r2=1142005&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java Fri Jul  1 17:45:54 2011
@@ -92,4 +92,8 @@ public class MessageEvaluationContext {
         dropped = false;
         loaded = false;
     }
+
+    public MessageReference getMessageReference() {
+        return messageReference;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java?rev=1142005&r1=1142004&r2=1142005&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java Fri Jul  1 17:45:54 2011
@@ -103,10 +103,6 @@ public class CompositeDemandForwardingBr
         // TODO is there much we can do here?
     }
 
-    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
-        return new NetworkBridgeFilter(getFromBrokerId(info), configuration.getNetworkTTL());
-    }
-
     protected BrokerId[] getRemoteBrokerPath() {
         return remoteBrokerPath;
     }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java?rev=1142005&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java Fri Jul  1 17:45:54 2011
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.util.List;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.NetworkBridgeFilter;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * implement conditional behaviour for queue consumers,
+ * allows replaying back to origin if no consumers are present on the local broker
+ * after a configurable delay, irrespective of the networkTTL
+ * Also allows rate limiting of messages through the network, useful for static includes
+ *
+ *  @org.apache.xbean.XBean
+ */
+
+public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
+    boolean replayWhenNoConsumers = false;
+    int replayDelay = 0;
+    int rateLimit = 0;
+    int rateDuration = 1000;
+
+    @Override
+    public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) {
+        ConditionalNetworkBridgeFilter filter = new ConditionalNetworkBridgeFilter();
+        filter.setNetworkBrokerId(remoteBrokerPath[0]);
+        filter.setNetworkTTL(networkTimeToLive);
+        filter.setAllowReplayWhenNoConsumers(isReplayWhenNoConsumers());
+        filter.setRateLimit(getRateLimit());
+        filter.setRateDuration(getRateDuration());
+        filter.setReplayDelay(getReplayDelay());
+        return filter;
+    }
+
+    public void setReplayWhenNoConsumers(boolean replayWhenNoConsumers) {
+        this.replayWhenNoConsumers = replayWhenNoConsumers;
+    }
+
+    public boolean isReplayWhenNoConsumers() {
+        return replayWhenNoConsumers;
+    }
+
+    public void setRateLimit(int rateLimit) {
+        this.rateLimit = rateLimit;
+    }
+
+    public int getRateLimit() {
+        return rateLimit;
+    }
+
+    public int getRateDuration() {
+        return rateDuration;
+    }
+
+    public void setRateDuration(int rateDuration) {
+        this.rateDuration = rateDuration;
+    }
+
+    public int getReplayDelay() {
+        return replayDelay;
+    }
+
+    public void setReplayDelay(int replayDelay) {
+        this.replayDelay = replayDelay;
+    }
+
+    private static class ConditionalNetworkBridgeFilter extends NetworkBridgeFilter {
+        final static Logger LOG = LoggerFactory.getLogger(ConditionalNetworkBridgeFilter.class);
+        private int rateLimit;
+        private int rateDuration = 1000;
+        private boolean allowReplayWhenNoConsumers = true;
+        private int replayDelay = 1000;
+
+        private int matchCount;
+        private long rateDurationEnd;
+
+        @Override
+        protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) {
+            boolean match = true;
+            if (mec.getDestination().isQueue()) {
+                if (contains(message.getBrokerPath(), networkBrokerId)) {
+                    // potential replay back to origin
+                    match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
+
+                    if (match && LOG.isTraceEnabled()) {
+                        LOG.trace("Replaying  [" + message.getMessageId() +"] for [" + message.getDestination() +"] back to origin in the absence of a local consumer");
+                    }
+                }
+
+                if (match && rateLimitExceeded()) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount + ">" + rateLimit  + "/" + rateDuration);
+                    }
+                    match = false;
+                }
+
+            } else {
+                // use existing logic for topics
+                match = super.matchesForwardingFilter(message, mec);
+            }
+
+            return match;
+        }
+
+        private boolean hasNotJustArrived(Message message) {
+            return replayDelay ==0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis());
+        }
+
+        private boolean hasNoLocalConsumers(final Message message, final MessageEvaluationContext mec) {
+            List<Subscription> consumers = mec.getMessageReference().getRegionDestination().getConsumers();
+            for (Subscription sub : consumers) {
+                if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Not replaying [" + message.getMessageId() + "] for [" + message.getDestination() +"] to origin due to existing local consumer: " + sub.getConsumerInfo());
+                    }
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        private boolean rateLimitExceeded() {
+            if (rateLimit == 0) {
+                return false;
+            }
+
+            if (rateDurationEnd < System.currentTimeMillis()) {
+                rateDurationEnd = System.currentTimeMillis() + rateDuration;
+                matchCount = 0;
+            }
+            return ++matchCount > rateLimit;
+        }
+
+        public void setReplayDelay(int replayDelay) {
+            this.replayDelay = replayDelay;
+        }
+
+        public void setRateLimit(int rateLimit) {
+            this.rateLimit = rateLimit;
+        }
+
+        public void setRateDuration(int rateDuration) {
+            this.rateDuration = rateDuration;
+        }
+
+        public void setAllowReplayWhenNoConsumers(boolean allowReplayWhenNoConsumers) {
+            this.allowReplayWhenNoConsumers = allowReplayWhenNoConsumers;
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java?rev=1142005&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java Fri Jul  1 17:45:54 2011
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.NetworkBridgeFilter;
+
+/**
+ * implement default behaviour, filter that will not allow resend to origin
+ * based on brokerPath and which respects networkTTL
+ *
+ *  @org.apache.xbean.XBean
+ */
+public class DefaultNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
+    public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) {
+        return new NetworkBridgeFilter(remoteBrokerPath[0], networkTimeToLive);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=1142005&r1=1142004&r2=1142005&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Fri Jul  1 17:45:54 2011
@@ -89,10 +89,6 @@ public class DemandForwardingBridge exte
         }
     }
 
-    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
-        return new NetworkBridgeFilter(remoteBrokerPath[0], configuration.getNetworkTTL());
-    }
-
     protected BrokerId[] getRemoteBrokerPath() {
         return remoteBrokerPath;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1142005&r1=1142004&r2=1142005&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Jul  1 17:45:54 2011
@@ -34,6 +34,7 @@ import org.apache.activemq.broker.Transp
 import org.apache.activemq.broker.region.AbstractRegion;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTempDestination;
@@ -75,7 +76,6 @@ import org.apache.activemq.transport.tcp
 import org.apache.activemq.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 
 /**
  * A useful base class for implementing demand forwarding bridges.
@@ -116,6 +116,7 @@ public abstract class DemandForwardingBr
     protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
     protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
     protected NetworkBridgeConfiguration configuration;
+    protected NetworkBridgeFilterFactory filterFactory;
 
     final AtomicLong enqueueCounter = new AtomicLong();
     final AtomicLong dequeueCounter = new AtomicLong();
@@ -721,7 +722,7 @@ public abstract class DemandForwardingBr
                         
                         Message message = configureMessage(md);
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
+                            LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + message.getMessageId() + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
                         }
                         
                         if (!message.isResponseRequired()) {
@@ -803,23 +804,15 @@ public abstract class DemandForwardingBr
     }
 
     private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
-        // See if this consumer's brokerPath tells us it came from the broker at the other end
-        // of the bridge. I think we should be making this decision based on the message's
-        // broker bread crumbs and not the consumer's? However, the message's broker bread
-        // crumbs are null, which is another matter.   
         boolean suppress = false;
-        Object consumerInfo = md.getMessage().getDataStructure();
-        if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) {
-            suppress = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
-        }
-        
-        // for durable subs, suppression via filter leaves dangling acks so we need to 
+        // for durable subs, suppression via filter leaves dangling acks so we need to
         // check here and allow the ack irrespective
-        if (!suppress && sub.getLocalInfo().isDurable()) {
+        if (sub.getLocalInfo().isDurable()) {
             MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
             messageEvalContext.setMessageReference(md.getMessage());
-            suppress = !createNetworkBridgeFilter(null).matches(messageEvalContext);
-        }  
+            messageEvalContext.setDestination(md.getDestination());
+            suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
+        }
         return suppress;
     }
 
@@ -1172,10 +1165,11 @@ public abstract class DemandForwardingBr
         subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
         subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
 
+        sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
         if (!info.isDurable()) {
             // This works for now since we use a VM connection to the local broker.
             // may need to change if we ever subscribe to a remote broker.
-            sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
+            sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
         } else  {
             // need to ack this message if it is ignored as it is durable so
             // we check before we send. see: suppressMessageDispatch()
@@ -1219,7 +1213,20 @@ public abstract class DemandForwardingBr
         subscriptionMapByRemoteId.clear();
     }
 
-    protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException;
+    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
+        if (filterFactory == null)  {
+            if (brokerService != null && brokerService.getDestinationPolicy() != null) {
+                PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
+                if (entry != null) {
+                    filterFactory = entry.getNetworkBridgeFilterFactory();
+                }
+            }
+            if (filterFactory == null) {
+                filterFactory = new DefaultNetworkBridgeFilterFactory();
+            }
+        }
+         return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL() );
+    }
 
     protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=1142005&r1=1142004&r2=1142005&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java Fri Jul  1 17:45:54 2011
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.NetworkBridgeFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +41,7 @@ public class DemandSubscription {
 
     private AtomicInteger dispatched = new AtomicInteger(0);
     private AtomicBoolean activeWaiter = new AtomicBoolean();
+    private NetworkBridgeFilter networkBridgeFilter;
 
     DemandSubscription(ConsumerInfo info) {
         remoteInfo = info;
@@ -125,4 +127,12 @@ public class DemandSubscription {
         }
         return true;
     }
+
+    public NetworkBridgeFilter getNetworkBridgeFilter() {
+        return networkBridgeFilter;
+    }
+
+    public void setNetworkBridgeFilter(NetworkBridgeFilter networkBridgeFilter) {
+        this.networkBridgeFilter = networkBridgeFilter;
+    }
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java?rev=1142005&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java Fri Jul  1 17:45:54 2011
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.network;
+
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.NetworkBridgeFilter;
+
+public interface NetworkBridgeFilterFactory {
+    // create a dispatch filter for network consumers, default impl will not send a message back to
+    // its origin to prevent looping, the down side is that messages can get stuck
+    NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive);
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1142005&r1=1142004&r2=1142005&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Fri Jul  1 17:45:54 2011
@@ -212,11 +212,15 @@ public class JmsMultipleBrokersTestSuppo
 
     protected BrokerService createBroker(URI brokerUri) throws Exception {
         BrokerService broker = BrokerFactory.createBroker(brokerUri);
+        configureBroker(broker);
         brokers.put(broker.getBrokerName(), new BrokerItem(broker));
 
         return broker;
     }
 
+    protected void configureBroker(BrokerService broker) {
+    }
+
     protected BrokerService createBroker(Resource configFile) throws Exception {
         BrokerFactoryBean brokerFactory = new BrokerFactoryBean(configFile);
         brokerFactory.afterPropertiesSet();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=1142005&r1=1142004&r2=1142005&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java Fri Jul  1 17:45:54 2011
@@ -39,12 +39,15 @@ import javax.management.ObjectName;
 import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerTestSupport;
 import org.apache.activemq.broker.StubConnection;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.jmx.ManagementContext;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ConnectionId;
@@ -120,7 +123,8 @@ public class BrokerNetworkWithStuckMessa
         NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
         config.setBrokerName("local");
         config.setDispatchAsync(false);
-        
+        config.setDuplex(true);
+
         Transport localTransport = createTransport(); 
         Transport remoteTransport = createRemoteTransport();
         
@@ -180,7 +184,7 @@ public class BrokerNetworkWithStuckMessa
         
         
         // Create a synchronous consumer on the remote broker 
-        final StubConnection connection2 = createRemoteConnection();
+        StubConnection connection2 = createRemoteConnection();
         ConnectionInfo connectionInfo2 = createConnectionInfo();
         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
         connection2.send(connectionInfo2);
@@ -191,69 +195,100 @@ public class BrokerNetworkWithStuckMessa
         connection2.send(consumerInfo2);
         
         // Consume 5 of the messages from the remote broker and ack them. 
-        // Because the prefetch size is set to 1000 in the createConsumerInfo() 
-        // method, this will cause the messages on the local broker to be 
-        // forwarded to the remote broker. 
         for (int i = 0; i < receiveNumMessages; ++i) {
-            Message message1 = receiveMessage(connection2);
+            Message message1 = receiveMessage(connection2, 20000);
             assertNotNull(message1);
-            connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE));
-            
-            Object[] msgs1 = browseQueueWithJmx(remoteBroker);
-            LOG.info("Found [" + msgs1.length + "] messages with JMX");
-//            assertEquals((sendNumMessages-i), msgs.length);
+            connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
         }
         
         // Ensure that there are zero messages on the local broker. This tells 
         // us that those messages have been prefetched to the remote broker 
-        // where the demand exists. 
+        // where the demand exists.
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Object[] result = browseQueueWithJmx(localBroker);
+               return 0 == result.length;
+            }
+        });
         messages = browseQueueWithJmx(localBroker);
         assertEquals(0, messages.length);
-        
+
+        LOG.info("Closing consumer on remote");
         // Close the consumer on the remote broker 
         connection2.send(consumerInfo2.createRemoveCommand());
-        
-        // There should now be 5 messages stuck on the remote broker 
+        // also close connection etc.. so messages get dropped from the local consumer  q
+        connection2.send(connectionInfo2.createRemoveCommand());
+
+        // There should now be 5 messages stuck on the remote broker
         messages = browseQueueWithJmx(remoteBroker);
         assertEquals(5, messages.length);
-        
-        // Create a consumer on the local broker just to confirm that it doesn't 
-        // receive any messages  
+
+         LOG.info("Messages now stuck on remote");
+
+        // receive again on the origin broker
         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destinationInfo1);
         connection1.send(consumerInfo1);
-        Message message1 = receiveMessage(connection1);
-        
-        //////////////////////////////////////////////////////
-        // An assertNull() is done here because this is currently the correct 
-        // behavior. This is actually the purpose of this test - to prove that 
-        // messages are stuck on the remote broker. AMQ-2324 and AMQ-2484 aim 
-        // to fix this situation so that messages don't get stuck. 
-        assertNull(message1);
-        //////////////////////////////////////////////////////
-        
+        LOG.info("create local consumer: " + consumerInfo1);
+
+        Message message1 = receiveMessage(connection1, 20000);
+        assertNotNull("Expect to get a replay as remote consumer is gone", message1);
+        connection1.send(createAck(consumerInfo1, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
+        LOG.info("acked one message on origin, waiting for all messages to percolate back");
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Object[] result = browseQueueWithJmx(localBroker);
+               return 4 == result.length;
+            }
+        });
+        messages = browseQueueWithJmx(localBroker);
+        assertEquals(4, messages.length);
+
+        LOG.info("checking for messages on remote again");
+        // messages won't migrate back again till consumer closes
+        connection2 = createRemoteConnection();
+        connectionInfo2 = createConnectionInfo();
+        sessionInfo2 = createSessionInfo(connectionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
         ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo2, destinationInfo2);
         connection2.send(consumerInfo3);
-        
-        // Consume the last 5 messages from the remote broker and ack them just 
+        message1 = receiveMessage(connection2, 20000);
+        assertNull("Messages have migrated back: " + message1, message1);
+
+        // Consume the last 4 messages from the local broker and ack them just
         // to clean up the queue. 
-        int counter = 0;
-        for (int i = 0; i < receiveNumMessages; ++i) {
-            message1 = receiveMessage(connection2);
-            assertNotNull(message1);
-            connection2.send(createAck(consumerInfo3, message1, 1, MessageAck.STANDARD_ACK_TYPE));
-            ++counter;
+        int counter = 1;
+        for (; counter < receiveNumMessages; counter++) {
+            message1 = receiveMessage(connection1);
+            connection1.send(createAck(consumerInfo1, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
         }
         // Ensure that 5 messages were received
         assertEquals(receiveNumMessages, counter);
-        
-        // Let those acks percolate... This stinks but it's the only way currently
-        // because these types of internal broker actions are non-deterministic. 
-        Thread.sleep(4000);
-        
-        // Ensure that the queue on the remote broker is empty 
+
+        // verify all messages consumed
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Object[] result = browseQueueWithJmx(remoteBroker);
+               return 0 == result.length;
+            }
+        });
         messages = browseQueueWithJmx(remoteBroker);
         assertEquals(0, messages.length);
-        
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Object[] result = browseQueueWithJmx(localBroker);
+               return 0 == result.length;
+            }
+        });
+        messages = browseQueueWithJmx(localBroker);
+        assertEquals(0, messages.length);
+
         // Close the consumer on the remote broker 
         connection2.send(consumerInfo3.createRemoveCommand());
         
@@ -269,6 +304,7 @@ public class BrokerNetworkWithStuckMessa
         localBroker.setPersistent(false);
         connector = createConnector();
         localBroker.addConnector(connector);
+        configureBroker(localBroker);
         localBroker.start();
         localBroker.waitUntilStarted();
         
@@ -278,7 +314,18 @@ public class BrokerNetworkWithStuckMessa
         
         return localBroker;
     }
-    
+
+    private void configureBroker(BrokerService broker) {
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setExpireMessagesPeriod(0);
+        ConditionalNetworkBridgeFilterFactory filterFactory = new ConditionalNetworkBridgeFilterFactory();
+        filterFactory.setReplayWhenNoConsumers(true);
+        defaultEntry.setNetworkBridgeFilterFactory(filterFactory);
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+    }
+
     protected BrokerService createRemoteBroker() throws Exception {
         remoteBroker = new BrokerService();
         remoteBroker.setBrokerName("remotehost");
@@ -287,6 +334,8 @@ public class BrokerNetworkWithStuckMessa
         remoteBroker.setPersistent(false);
         remoteConnector = createRemoteConnector();
         remoteBroker.addConnector(remoteConnector);
+        configureBroker(remoteBroker);
+        remoteBroker.start();
         remoteBroker.waitUntilStarted();
         
         remoteBroker.getManagementContext().setConnectorPort(2222);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkRestartTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkRestartTest.java?rev=1142005&r1=1142004&r2=1142005&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkRestartTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkRestartTest.java Fri Jul  1 17:45:54 2011
@@ -53,6 +53,9 @@ public class NetworkRestartTest extends 
 
         // restart connector
 
+        // wait for ack back to localbroker with concurrent store and dispatch, dispatch occurs first
+        Thread.sleep(1000);
+
         NetworkConnector connector = localBroker.getNetworkConnectorByName("networkConnector");
 
         LOG.info("Stopping connector");
@@ -83,6 +86,9 @@ public class NetworkRestartTest extends 
 
         // restart connector
 
+        // wait for ack back to localbroker with concurrent store and dispatch, dispatch occurs first
+        Thread.sleep(1000);
+
         NetworkConnector connector = localBroker.getNetworkConnectorByName("networkConnector");
 
         LOG.info("Removing connector");

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java?rev=1142005&r1=1142004&r2=1142005&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java Fri Jul  1 17:45:54 2011
@@ -17,6 +17,8 @@
 package org.apache.activemq.usecases;
 
 import java.net.URI;
+import java.util.Collection;
+import java.util.Iterator;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -27,6 +29,11 @@ import javax.jms.Session;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
+import org.apache.activemq.network.NetworkConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +43,7 @@ import org.slf4j.LoggerFactory;
 public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSupport {
     protected static final int MESSAGE_COUNT = 100; // Best if a factor of 100
     protected static final int PREFETCH_COUNT = 1;
+    protected static final int NETWORK_PREFETCH = 1;
     private static final Logger LOG = LoggerFactory.getLogger(TwoBrokerQueueClientsReconnectTest.class);
 
 
@@ -161,6 +169,9 @@ public class TwoBrokerQueueClientsReconn
     }
 
     public void doTwoClientsReceiveOneClientDisconnects() throws Exception {
+        // ensure all message do not flow across the network too quickly
+        applyRateLimitNetworkFilter(0.8 * MESSAGE_COUNT);
+
         // Bridge brokers
         bridgeBrokers(broker1, broker2);
         bridgeBrokers(broker2, broker1);
@@ -181,14 +192,14 @@ public class TwoBrokerQueueClientsReconn
         // Always send messages to broker A
         sendMessages("BrokerA", dest, MESSAGE_COUNT);
 
-        // Let each client receive 20% of the messages - 40% total
+        LOG.info("Let each client receive 20% of the messages - 40% total");
         msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
         msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
 
         // Disconnect the first client
         client1.close();
 
-        // Let the second client receive the rest of the messages
+        LOG.info("Let the second client receive the rest of the messages");
         msgsClient2 += receiveAllMessages(client2);
         client2.close();
 
@@ -214,6 +225,9 @@ public class TwoBrokerQueueClientsReconn
     }
 
     public void doTwoClientsReceiveOneClientReconnects() throws Exception {
+        // ensure all message do not flow across the network too quickly
+        applyRateLimitNetworkFilter(0.2 * MESSAGE_COUNT);
+
         // Bridge brokers
         bridgeBrokers(broker1, broker2);
         bridgeBrokers(broker2, broker1);
@@ -238,22 +252,31 @@ public class TwoBrokerQueueClientsReconn
         msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
         msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
 
-        // Disconnect the first client
+        LOG.info("msgsClient1=" + msgsClient1);
+        LOG.info("msgsClient2=" + msgsClient2);
+
+        Thread.sleep(1000);
+        LOG.info("Disconnect the first client");
         client1.close();
 
-        // Let the second client receive 20% more of the total messages
+        LOG.info("Let the second client receive 20% more of the total messages");
         msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
 
+        LOG.info("msgsClient2=" + msgsClient2);
+
         // Create another client for broker 1
         client1 = createConsumer(broker1, dest);
-        Thread.sleep(500);
+        Thread.sleep(1000);
 
         // Let each client receive 20% of the messages - 40% total
         msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
         client1.close();
+        LOG.info("new consumer addition, msgsClient1=" + msgsClient1);
 
+        Thread.sleep(2000);
         msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
         client2.close();
+        LOG.info("msgsClient2=" + msgsClient2);
 
         // First client should have received 40 messages
         assertEquals("Client for " + broker1 + " should have received 40% of the messages.", (int)(MESSAGE_COUNT * 0.40), msgsClient1);
@@ -262,7 +285,23 @@ public class TwoBrokerQueueClientsReconn
         assertEquals("Client for " + broker2 + " should have received 60% of the messages.", (int)(MESSAGE_COUNT * 0.60), msgsClient2);
     }
 
+    private void applyRateLimitNetworkFilter(double rateLimit) {
+        ConditionalNetworkBridgeFilterFactory filterFactory = new ConditionalNetworkBridgeFilterFactory();
+        filterFactory.setReplayWhenNoConsumers(true);
+        filterFactory.setRateLimit((int) rateLimit);
+        filterFactory.setRateDuration(1000);
+
+        Collection<BrokerItem> brokerList = brokers.values();
+        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
+            BrokerService broker = i.next().broker;
+            broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(filterFactory);
+        }
+    }
+
     public void testTwoClientsReceiveTwoClientReconnects() throws Exception {
+        // ensure all message do not flow across the network too quickly
+        applyRateLimitNetworkFilter(0.5 * MESSAGE_COUNT);
+
         broker1 = "BrokerA";
         broker2 = "BrokerB";
 
@@ -290,19 +329,18 @@ public class TwoBrokerQueueClientsReconn
         msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
         msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
 
-        // Disconnect both clients
+        LOG.info("Disconnect both clients");
         client1.close();
         client2.close();
 
-        // Create another two clients for each broker
-        client1 = createConsumer(broker1, dest);
-        client2 = createConsumer(broker2, dest);
-        Thread.sleep(500);
-
         // Let each client receive 30% more of the total messages - 60% total
+        LOG.info("Serially create another two clients for each broker and consume in turn");
+        client1 = createConsumer(broker1, dest);
         msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.30));
         client1.close();
 
+        // the close will allow replay or the replay of the remaining messages
+        client2 = createConsumer(broker2, dest);
         msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.30));
         client2.close();
 
@@ -317,7 +355,7 @@ public class TwoBrokerQueueClientsReconn
         Message msg;
         int i;
         for (i = 0; i < msgCount; i++) {
-            msg = consumer.receive(1000);
+            msg = consumer.receive(4000);
             if (msg == null) {
                 LOG.error("Consumer failed to receive exactly " + msgCount + " messages. Actual messages received is: " + i);
                 break;
@@ -348,6 +386,21 @@ public class TwoBrokerQueueClientsReconn
         return sess.createConsumer(dest);
     }
 
+    protected void configureBroker(BrokerService broker) {
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setEnableAudit(false);
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+    }
+
+    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception {
+        NetworkConnector nc = super.bridgeBrokers(localBroker,remoteBroker, dynamicOnly, networkTTL, conduit, failover);
+        nc.setPrefetchSize(NETWORK_PREFETCH);
+        nc.setDecreaseNetworkConsumerPriority(true);
+        return nc;
+    }
+
     public void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();



Mime
View raw message