Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8A4344CF7 for ; Fri, 1 Jul 2011 17:46:22 +0000 (UTC) Received: (qmail 43227 invoked by uid 500); 1 Jul 2011 17:46:20 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 41967 invoked by uid 500); 1 Jul 2011 17:46:18 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 41911 invoked by uid 99); 1 Jul 2011 17:46:18 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jul 2011 17:46:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jul 2011 17:46:15 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3DC322388897 for ; Fri, 1 Jul 2011 17:45:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1142005 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/broker/region/ src/main/java/org/apache/activemq/broker/region/policy/ src/main/java/org/apache/activemq/command/ src/main/java/org/apache/activemq/filter/ sr... Date: Fri, 01 Jul 2011 17:45:54 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110701174555.3DC322388897@eris.apache.org> Author: gtully Date: Fri Jul 1 17:45:54 2011 New Revision: 1142005 URL: http://svn.apache.org/viewvc?rev=1142005&view=rev Log: https://issues.apache.org/jira/browse/AMQ-2484,https://issues.apache.org/jira/browse/AMQ-2324: Forwarded message cannot be distributed to the original broker, Avoid stuck messages in a network of brokers - implement destination policy that allows the network filter factory to be provided. A new implementation will allow queues to replay messages with a configurable delay once there are no local consumers. Also it allows a rate limit on the network consumer such that a statically included dest does not consume all messages if there is a slow local consumer Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java (with props) Modified: activemq/trunk/activemq-core/pom.xml activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkRestartTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java Modified: activemq/trunk/activemq-core/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=1142005&r1=1142004&r2=1142005&view=diff ============================================================================== --- activemq/trunk/activemq-core/pom.xml (original) +++ activemq/trunk/activemq-core/pom.xml Fri Jul 1 17:45:54 2011 @@ -451,8 +451,6 @@ **/MissingDataFileTest.* - **/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.* - **/TwoBrokerQueueClientsReconnectTest.* **/QueueConsumerCloseAndReconnectTest.* **/TwoBrokerMulticastQueueTest.* Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1142005&r1=1142004&r2=1142005&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Jul 1 17:45:54 2011 @@ -1827,29 +1827,26 @@ public class Queue extends BaseDestinati interestCount++; continue; } - if (dispatchSelector.canSelect(s, node)) { - if (!fullConsumers.contains(s)) { - if (!s.isFull()) { - if (assignMessageGroup(s, (QueueMessageReference)node)) { - // Dispatch it. - s.add(node); - target = s; - break; - } - } else { - // no further dispatch of list to a full consumer to - // avoid out of order message receipt - fullConsumers.add(s); - } + if (!fullConsumers.contains(s) && !s.isFull()) { + if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node)) { + // Dispatch it. + s.add(node); + target = s; + break; } - interestCount++; } else { - // makes sure it gets dispatched again - if (!node.isDropped() && !((QueueMessageReference) node).isAcked() - && (!node.isDropped() || s.getConsumerInfo().isBrowser())) { - interestCount++; + // no further dispatch of list to a full consumer to + // avoid out of order message receipt + fullConsumers.add(s); + if (LOG.isTraceEnabled()) { + LOG.trace("Sub full " + s); } } + // make sure it gets dispatched again + if (!node.isDropped() && !((QueueMessageReference) node).isAcked() && + (!node.isDropped() || s.getConsumerInfo().isBrowser())) { + interestCount++; + } } if ((target == null && interestCount > 0) || consumers.size() == 0) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1142005&r1=1142004&r2=1142005&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Fri Jul 1 17:45:54 2011 @@ -30,6 +30,7 @@ import org.apache.activemq.broker.region import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.filter.DestinationMapEntry; +import org.apache.activemq.network.NetworkBridgeFilterFactory; import org.apache.activemq.usage.SystemUsage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,6 +94,7 @@ public class PolicyEntry extends Destina private boolean gcWithNetworkConsumers; private long inactiveTimoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; private boolean reduceMemoryFootprint; + private NetworkBridgeFilterFactory networkBridgeFilterFactory; public void configure(Broker broker,Queue queue) { @@ -805,4 +807,12 @@ public class PolicyEntry extends Destina public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) { this.reduceMemoryFootprint = reduceMemoryFootprint; } + + public void setNetworkBridgeFilterFactory(NetworkBridgeFilterFactory networkBridgeFilterFactory) { + this.networkBridgeFilterFactory = networkBridgeFilterFactory; + } + + public NetworkBridgeFilterFactory getNetworkBridgeFilterFactory() { + return networkBridgeFilterFactory; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java?rev=1142005&r1=1142004&r2=1142005&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java Fri Jul 1 17:45:54 2011 @@ -36,14 +36,14 @@ public class NetworkBridgeFilter impleme public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER; static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class); - private BrokerId networkBrokerId; - private int networkTTL; + protected BrokerId networkBrokerId; + protected int networkTTL; public NetworkBridgeFilter() { } - public NetworkBridgeFilter(BrokerId remoteBrokerPath, int networkTTL) { - this.networkBrokerId = remoteBrokerPath; + public NetworkBridgeFilter(BrokerId networkBrokerId, int networkTTL) { + this.networkBrokerId = networkBrokerId; this.networkTTL = networkTTL; } @@ -62,7 +62,7 @@ public class NetworkBridgeFilter impleme // in the dispatch loop // so need to get the reference to it Message message = mec.getMessage(); - return message != null && matchesForwardingFilter(message); + return message != null && matchesForwardingFilter(message, mec); } catch (IOException e) { throw JMSExceptionSupport.create(e); } @@ -72,11 +72,11 @@ public class NetworkBridgeFilter impleme return matches(message) ? Boolean.TRUE : Boolean.FALSE; } - protected boolean matchesForwardingFilter(Message message) { + protected boolean matchesForwardingFilter(Message message, MessageEvaluationContext mec) { if (contains(message.getBrokerPath(), networkBrokerId)) { if (LOG.isTraceEnabled()) { - LOG.trace("Message all ready routed once through this broker (" + LOG.trace("Message all ready routed once through target broker (" + networkBrokerId + "), path: " + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message); } @@ -92,7 +92,6 @@ public class NetworkBridgeFilter impleme return false; } - // Don't propagate advisory messages about network subscriptions if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) { ConsumerInfo info = (ConsumerInfo)message.getDataStructure(); hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length; @@ -102,6 +101,12 @@ public class NetworkBridgeFilter impleme } return false; } + + if (contains(info.getBrokerPath(), networkBrokerId)) { + LOG.trace("ConsumerInfo advisory all ready routed once through target broker (" + + networkBrokerId + "), path: " + + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message); + } } return true; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java?rev=1142005&r1=1142004&r2=1142005&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java Fri Jul 1 17:45:54 2011 @@ -92,4 +92,8 @@ public class MessageEvaluationContext { dropped = false; loaded = false; } + + public MessageReference getMessageReference() { + return messageReference; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java?rev=1142005&r1=1142004&r2=1142005&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java Fri Jul 1 17:45:54 2011 @@ -103,10 +103,6 @@ public class CompositeDemandForwardingBr // TODO is there much we can do here? } - protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { - return new NetworkBridgeFilter(getFromBrokerId(info), configuration.getNetworkTTL()); - } - protected BrokerId[] getRemoteBrokerPath() { return remoteBrokerPath; } Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java?rev=1142005&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java Fri Jul 1 17:45:54 2011 @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import java.util.List; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.NetworkBridgeFilter; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * implement conditional behaviour for queue consumers, + * allows replaying back to origin if no consumers are present on the local broker + * after a configurable delay, irrespective of the networkTTL + * Also allows rate limiting of messages through the network, useful for static includes + * + * @org.apache.xbean.XBean + */ + +public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory { + boolean replayWhenNoConsumers = false; + int replayDelay = 0; + int rateLimit = 0; + int rateDuration = 1000; + + @Override + public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) { + ConditionalNetworkBridgeFilter filter = new ConditionalNetworkBridgeFilter(); + filter.setNetworkBrokerId(remoteBrokerPath[0]); + filter.setNetworkTTL(networkTimeToLive); + filter.setAllowReplayWhenNoConsumers(isReplayWhenNoConsumers()); + filter.setRateLimit(getRateLimit()); + filter.setRateDuration(getRateDuration()); + filter.setReplayDelay(getReplayDelay()); + return filter; + } + + public void setReplayWhenNoConsumers(boolean replayWhenNoConsumers) { + this.replayWhenNoConsumers = replayWhenNoConsumers; + } + + public boolean isReplayWhenNoConsumers() { + return replayWhenNoConsumers; + } + + public void setRateLimit(int rateLimit) { + this.rateLimit = rateLimit; + } + + public int getRateLimit() { + return rateLimit; + } + + public int getRateDuration() { + return rateDuration; + } + + public void setRateDuration(int rateDuration) { + this.rateDuration = rateDuration; + } + + public int getReplayDelay() { + return replayDelay; + } + + public void setReplayDelay(int replayDelay) { + this.replayDelay = replayDelay; + } + + private static class ConditionalNetworkBridgeFilter extends NetworkBridgeFilter { + final static Logger LOG = LoggerFactory.getLogger(ConditionalNetworkBridgeFilter.class); + private int rateLimit; + private int rateDuration = 1000; + private boolean allowReplayWhenNoConsumers = true; + private int replayDelay = 1000; + + private int matchCount; + private long rateDurationEnd; + + @Override + protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) { + boolean match = true; + if (mec.getDestination().isQueue()) { + if (contains(message.getBrokerPath(), networkBrokerId)) { + // potential replay back to origin + match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message); + + if (match && LOG.isTraceEnabled()) { + LOG.trace("Replaying [" + message.getMessageId() +"] for [" + message.getDestination() +"] back to origin in the absence of a local consumer"); + } + } + + if (match && rateLimitExceeded()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount + ">" + rateLimit + "/" + rateDuration); + } + match = false; + } + + } else { + // use existing logic for topics + match = super.matchesForwardingFilter(message, mec); + } + + return match; + } + + private boolean hasNotJustArrived(Message message) { + return replayDelay ==0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis()); + } + + private boolean hasNoLocalConsumers(final Message message, final MessageEvaluationContext mec) { + List consumers = mec.getMessageReference().getRegionDestination().getConsumers(); + for (Subscription sub : consumers) { + if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Not replaying [" + message.getMessageId() + "] for [" + message.getDestination() +"] to origin due to existing local consumer: " + sub.getConsumerInfo()); + } + return false; + } + } + return true; + } + + private boolean rateLimitExceeded() { + if (rateLimit == 0) { + return false; + } + + if (rateDurationEnd < System.currentTimeMillis()) { + rateDurationEnd = System.currentTimeMillis() + rateDuration; + matchCount = 0; + } + return ++matchCount > rateLimit; + } + + public void setReplayDelay(int replayDelay) { + this.replayDelay = replayDelay; + } + + public void setRateLimit(int rateLimit) { + this.rateLimit = rateLimit; + } + + public void setRateDuration(int rateDuration) { + this.rateDuration = rateDuration; + } + + public void setAllowReplayWhenNoConsumers(boolean allowReplayWhenNoConsumers) { + this.allowReplayWhenNoConsumers = allowReplayWhenNoConsumers; + } + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java?rev=1142005&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java Fri Jul 1 17:45:54 2011 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.NetworkBridgeFilter; + +/** + * implement default behaviour, filter that will not allow resend to origin + * based on brokerPath and which respects networkTTL + * + * @org.apache.xbean.XBean + */ +public class DefaultNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory { + public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) { + return new NetworkBridgeFilter(remoteBrokerPath[0], networkTimeToLive); + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=1142005&r1=1142004&r2=1142005&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Fri Jul 1 17:45:54 2011 @@ -89,10 +89,6 @@ public class DemandForwardingBridge exte } } - protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { - return new NetworkBridgeFilter(remoteBrokerPath[0], configuration.getNetworkTTL()); - } - protected BrokerId[] getRemoteBrokerPath() { return remoteBrokerPath; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1142005&r1=1142004&r2=1142005&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Jul 1 17:45:54 2011 @@ -34,6 +34,7 @@ import org.apache.activemq.broker.Transp import org.apache.activemq.broker.region.AbstractRegion; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTempDestination; @@ -75,7 +76,6 @@ import org.apache.activemq.transport.tcp import org.apache.activemq.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; /** * A useful base class for implementing demand forwarding bridges. @@ -116,6 +116,7 @@ public abstract class DemandForwardingBr protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false); protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); protected NetworkBridgeConfiguration configuration; + protected NetworkBridgeFilterFactory filterFactory; final AtomicLong enqueueCounter = new AtomicLong(); final AtomicLong dequeueCounter = new AtomicLong(); @@ -721,7 +722,7 @@ public abstract class DemandForwardingBr Message message = configureMessage(md); if (LOG.isDebugEnabled()) { - LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message); + LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + message.getMessageId() + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message); } if (!message.isResponseRequired()) { @@ -803,23 +804,15 @@ public abstract class DemandForwardingBr } private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception { - // See if this consumer's brokerPath tells us it came from the broker at the other end - // of the bridge. I think we should be making this decision based on the message's - // broker bread crumbs and not the consumer's? However, the message's broker bread - // crumbs are null, which is another matter. boolean suppress = false; - Object consumerInfo = md.getMessage().getDataStructure(); - if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) { - suppress = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId()); - } - - // for durable subs, suppression via filter leaves dangling acks so we need to + // for durable subs, suppression via filter leaves dangling acks so we need to // check here and allow the ack irrespective - if (!suppress && sub.getLocalInfo().isDurable()) { + if (sub.getLocalInfo().isDurable()) { MessageEvaluationContext messageEvalContext = new MessageEvaluationContext(); messageEvalContext.setMessageReference(md.getMessage()); - suppress = !createNetworkBridgeFilter(null).matches(messageEvalContext); - } + messageEvalContext.setDestination(md.getDestination()); + suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext); + } return suppress; } @@ -1172,10 +1165,11 @@ public abstract class DemandForwardingBr subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub); subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub); + sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info)); if (!info.isDurable()) { // This works for now since we use a VM connection to the local broker. // may need to change if we ever subscribe to a remote broker. - sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info)); + sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter()); } else { // need to ack this message if it is ignored as it is durable so // we check before we send. see: suppressMessageDispatch() @@ -1219,7 +1213,20 @@ public abstract class DemandForwardingBr subscriptionMapByRemoteId.clear(); } - protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException; + protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { + if (filterFactory == null) { + if (brokerService != null && brokerService.getDestinationPolicy() != null) { + PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination()); + if (entry != null) { + filterFactory = entry.getNetworkBridgeFilterFactory(); + } + } + if (filterFactory == null) { + filterFactory = new DefaultNetworkBridgeFilterFactory(); + } + } + return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL() ); + } protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=1142005&r1=1142004&r2=1142005&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java Fri Jul 1 17:45:54 2011 @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.Atomi import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.NetworkBridgeFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,7 @@ public class DemandSubscription { private AtomicInteger dispatched = new AtomicInteger(0); private AtomicBoolean activeWaiter = new AtomicBoolean(); + private NetworkBridgeFilter networkBridgeFilter; DemandSubscription(ConsumerInfo info) { remoteInfo = info; @@ -125,4 +127,12 @@ public class DemandSubscription { } return true; } + + public NetworkBridgeFilter getNetworkBridgeFilter() { + return networkBridgeFilter; + } + + public void setNetworkBridgeFilter(NetworkBridgeFilter networkBridgeFilter) { + this.networkBridgeFilter = networkBridgeFilter; + } } Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java?rev=1142005&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java Fri Jul 1 17:45:54 2011 @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.network; + +import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.NetworkBridgeFilter; + +public interface NetworkBridgeFilterFactory { + // create a dispatch filter for network consumers, default impl will not send a message back to + // its origin to prevent looping, the down side is that messages can get stuck + NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive); +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1142005&r1=1142004&r2=1142005&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Fri Jul 1 17:45:54 2011 @@ -212,11 +212,15 @@ public class JmsMultipleBrokersTestSuppo protected BrokerService createBroker(URI brokerUri) throws Exception { BrokerService broker = BrokerFactory.createBroker(brokerUri); + configureBroker(broker); brokers.put(broker.getBrokerName(), new BrokerItem(broker)); return broker; } + protected void configureBroker(BrokerService broker) { + } + protected BrokerService createBroker(Resource configFile) throws Exception { BrokerFactoryBean brokerFactory = new BrokerFactoryBean(configFile); brokerFactory.afterPropertiesSet(); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=1142005&r1=1142004&r2=1142005&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java Fri Jul 1 17:45:54 2011 @@ -39,12 +39,15 @@ import javax.management.ObjectName; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; + import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerTestSupport; import org.apache.activemq.broker.StubConnection; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ConnectionId; @@ -120,7 +123,8 @@ public class BrokerNetworkWithStuckMessa NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); config.setBrokerName("local"); config.setDispatchAsync(false); - + config.setDuplex(true); + Transport localTransport = createTransport(); Transport remoteTransport = createRemoteTransport(); @@ -180,7 +184,7 @@ public class BrokerNetworkWithStuckMessa // Create a synchronous consumer on the remote broker - final StubConnection connection2 = createRemoteConnection(); + StubConnection connection2 = createRemoteConnection(); ConnectionInfo connectionInfo2 = createConnectionInfo(); SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); connection2.send(connectionInfo2); @@ -191,69 +195,100 @@ public class BrokerNetworkWithStuckMessa connection2.send(consumerInfo2); // Consume 5 of the messages from the remote broker and ack them. - // Because the prefetch size is set to 1000 in the createConsumerInfo() - // method, this will cause the messages on the local broker to be - // forwarded to the remote broker. for (int i = 0; i < receiveNumMessages; ++i) { - Message message1 = receiveMessage(connection2); + Message message1 = receiveMessage(connection2, 20000); assertNotNull(message1); - connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE)); - - Object[] msgs1 = browseQueueWithJmx(remoteBroker); - LOG.info("Found [" + msgs1.length + "] messages with JMX"); -// assertEquals((sendNumMessages-i), msgs.length); + connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE)); } // Ensure that there are zero messages on the local broker. This tells // us that those messages have been prefetched to the remote broker - // where the demand exists. + // where the demand exists. + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Object[] result = browseQueueWithJmx(localBroker); + return 0 == result.length; + } + }); messages = browseQueueWithJmx(localBroker); assertEquals(0, messages.length); - + + LOG.info("Closing consumer on remote"); // Close the consumer on the remote broker connection2.send(consumerInfo2.createRemoveCommand()); - - // There should now be 5 messages stuck on the remote broker + // also close connection etc.. so messages get dropped from the local consumer q + connection2.send(connectionInfo2.createRemoveCommand()); + + // There should now be 5 messages stuck on the remote broker messages = browseQueueWithJmx(remoteBroker); assertEquals(5, messages.length); - - // Create a consumer on the local broker just to confirm that it doesn't - // receive any messages + + LOG.info("Messages now stuck on remote"); + + // receive again on the origin broker ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destinationInfo1); connection1.send(consumerInfo1); - Message message1 = receiveMessage(connection1); - - ////////////////////////////////////////////////////// - // An assertNull() is done here because this is currently the correct - // behavior. This is actually the purpose of this test - to prove that - // messages are stuck on the remote broker. AMQ-2324 and AMQ-2484 aim - // to fix this situation so that messages don't get stuck. - assertNull(message1); - ////////////////////////////////////////////////////// - + LOG.info("create local consumer: " + consumerInfo1); + + Message message1 = receiveMessage(connection1, 20000); + assertNotNull("Expect to get a replay as remote consumer is gone", message1); + connection1.send(createAck(consumerInfo1, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE)); + LOG.info("acked one message on origin, waiting for all messages to percolate back"); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Object[] result = browseQueueWithJmx(localBroker); + return 4 == result.length; + } + }); + messages = browseQueueWithJmx(localBroker); + assertEquals(4, messages.length); + + LOG.info("checking for messages on remote again"); + // messages won't migrate back again till consumer closes + connection2 = createRemoteConnection(); + connectionInfo2 = createConnectionInfo(); + sessionInfo2 = createSessionInfo(connectionInfo2); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo2, destinationInfo2); connection2.send(consumerInfo3); - - // Consume the last 5 messages from the remote broker and ack them just + message1 = receiveMessage(connection2, 20000); + assertNull("Messages have migrated back: " + message1, message1); + + // Consume the last 4 messages from the local broker and ack them just // to clean up the queue. - int counter = 0; - for (int i = 0; i < receiveNumMessages; ++i) { - message1 = receiveMessage(connection2); - assertNotNull(message1); - connection2.send(createAck(consumerInfo3, message1, 1, MessageAck.STANDARD_ACK_TYPE)); - ++counter; + int counter = 1; + for (; counter < receiveNumMessages; counter++) { + message1 = receiveMessage(connection1); + connection1.send(createAck(consumerInfo1, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE)); } // Ensure that 5 messages were received assertEquals(receiveNumMessages, counter); - - // Let those acks percolate... This stinks but it's the only way currently - // because these types of internal broker actions are non-deterministic. - Thread.sleep(4000); - - // Ensure that the queue on the remote broker is empty + + // verify all messages consumed + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Object[] result = browseQueueWithJmx(remoteBroker); + return 0 == result.length; + } + }); messages = browseQueueWithJmx(remoteBroker); assertEquals(0, messages.length); - + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Object[] result = browseQueueWithJmx(localBroker); + return 0 == result.length; + } + }); + messages = browseQueueWithJmx(localBroker); + assertEquals(0, messages.length); + // Close the consumer on the remote broker connection2.send(consumerInfo3.createRemoveCommand()); @@ -269,6 +304,7 @@ public class BrokerNetworkWithStuckMessa localBroker.setPersistent(false); connector = createConnector(); localBroker.addConnector(connector); + configureBroker(localBroker); localBroker.start(); localBroker.waitUntilStarted(); @@ -278,7 +314,18 @@ public class BrokerNetworkWithStuckMessa return localBroker; } - + + private void configureBroker(BrokerService broker) { + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(0); + ConditionalNetworkBridgeFilterFactory filterFactory = new ConditionalNetworkBridgeFilterFactory(); + filterFactory.setReplayWhenNoConsumers(true); + defaultEntry.setNetworkBridgeFilterFactory(filterFactory); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + } + protected BrokerService createRemoteBroker() throws Exception { remoteBroker = new BrokerService(); remoteBroker.setBrokerName("remotehost"); @@ -287,6 +334,8 @@ public class BrokerNetworkWithStuckMessa remoteBroker.setPersistent(false); remoteConnector = createRemoteConnector(); remoteBroker.addConnector(remoteConnector); + configureBroker(remoteBroker); + remoteBroker.start(); remoteBroker.waitUntilStarted(); remoteBroker.getManagementContext().setConnectorPort(2222); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkRestartTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkRestartTest.java?rev=1142005&r1=1142004&r2=1142005&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkRestartTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkRestartTest.java Fri Jul 1 17:45:54 2011 @@ -53,6 +53,9 @@ public class NetworkRestartTest extends // restart connector + // wait for ack back to localbroker with concurrent store and dispatch, dispatch occurs first + Thread.sleep(1000); + NetworkConnector connector = localBroker.getNetworkConnectorByName("networkConnector"); LOG.info("Stopping connector"); @@ -83,6 +86,9 @@ public class NetworkRestartTest extends // restart connector + // wait for ack back to localbroker with concurrent store and dispatch, dispatch occurs first + Thread.sleep(1000); + NetworkConnector connector = localBroker.getNetworkConnectorByName("networkConnector"); LOG.info("Removing connector"); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java?rev=1142005&r1=1142004&r2=1142005&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java Fri Jul 1 17:45:54 2011 @@ -17,6 +17,8 @@ package org.apache.activemq.usecases; import java.net.URI; +import java.util.Collection; +import java.util.Iterator; import javax.jms.Connection; import javax.jms.Destination; @@ -27,6 +29,11 @@ import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory; +import org.apache.activemq.network.NetworkConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +43,7 @@ import org.slf4j.LoggerFactory; public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSupport { protected static final int MESSAGE_COUNT = 100; // Best if a factor of 100 protected static final int PREFETCH_COUNT = 1; + protected static final int NETWORK_PREFETCH = 1; private static final Logger LOG = LoggerFactory.getLogger(TwoBrokerQueueClientsReconnectTest.class); @@ -161,6 +169,9 @@ public class TwoBrokerQueueClientsReconn } public void doTwoClientsReceiveOneClientDisconnects() throws Exception { + // ensure all message do not flow across the network too quickly + applyRateLimitNetworkFilter(0.8 * MESSAGE_COUNT); + // Bridge brokers bridgeBrokers(broker1, broker2); bridgeBrokers(broker2, broker1); @@ -181,14 +192,14 @@ public class TwoBrokerQueueClientsReconn // Always send messages to broker A sendMessages("BrokerA", dest, MESSAGE_COUNT); - // Let each client receive 20% of the messages - 40% total + LOG.info("Let each client receive 20% of the messages - 40% total"); msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); // Disconnect the first client client1.close(); - // Let the second client receive the rest of the messages + LOG.info("Let the second client receive the rest of the messages"); msgsClient2 += receiveAllMessages(client2); client2.close(); @@ -214,6 +225,9 @@ public class TwoBrokerQueueClientsReconn } public void doTwoClientsReceiveOneClientReconnects() throws Exception { + // ensure all message do not flow across the network too quickly + applyRateLimitNetworkFilter(0.2 * MESSAGE_COUNT); + // Bridge brokers bridgeBrokers(broker1, broker2); bridgeBrokers(broker2, broker1); @@ -238,22 +252,31 @@ public class TwoBrokerQueueClientsReconn msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); - // Disconnect the first client + LOG.info("msgsClient1=" + msgsClient1); + LOG.info("msgsClient2=" + msgsClient2); + + Thread.sleep(1000); + LOG.info("Disconnect the first client"); client1.close(); - // Let the second client receive 20% more of the total messages + LOG.info("Let the second client receive 20% more of the total messages"); msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); + LOG.info("msgsClient2=" + msgsClient2); + // Create another client for broker 1 client1 = createConsumer(broker1, dest); - Thread.sleep(500); + Thread.sleep(1000); // Let each client receive 20% of the messages - 40% total msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); client1.close(); + LOG.info("new consumer addition, msgsClient1=" + msgsClient1); + Thread.sleep(2000); msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); client2.close(); + LOG.info("msgsClient2=" + msgsClient2); // First client should have received 40 messages assertEquals("Client for " + broker1 + " should have received 40% of the messages.", (int)(MESSAGE_COUNT * 0.40), msgsClient1); @@ -262,7 +285,23 @@ public class TwoBrokerQueueClientsReconn assertEquals("Client for " + broker2 + " should have received 60% of the messages.", (int)(MESSAGE_COUNT * 0.60), msgsClient2); } + private void applyRateLimitNetworkFilter(double rateLimit) { + ConditionalNetworkBridgeFilterFactory filterFactory = new ConditionalNetworkBridgeFilterFactory(); + filterFactory.setReplayWhenNoConsumers(true); + filterFactory.setRateLimit((int) rateLimit); + filterFactory.setRateDuration(1000); + + Collection brokerList = brokers.values(); + for (Iterator i = brokerList.iterator(); i.hasNext();) { + BrokerService broker = i.next().broker; + broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(filterFactory); + } + } + public void testTwoClientsReceiveTwoClientReconnects() throws Exception { + // ensure all message do not flow across the network too quickly + applyRateLimitNetworkFilter(0.5 * MESSAGE_COUNT); + broker1 = "BrokerA"; broker2 = "BrokerB"; @@ -290,19 +329,18 @@ public class TwoBrokerQueueClientsReconn msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); - // Disconnect both clients + LOG.info("Disconnect both clients"); client1.close(); client2.close(); - // Create another two clients for each broker - client1 = createConsumer(broker1, dest); - client2 = createConsumer(broker2, dest); - Thread.sleep(500); - // Let each client receive 30% more of the total messages - 60% total + LOG.info("Serially create another two clients for each broker and consume in turn"); + client1 = createConsumer(broker1, dest); msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.30)); client1.close(); + // the close will allow replay or the replay of the remaining messages + client2 = createConsumer(broker2, dest); msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.30)); client2.close(); @@ -317,7 +355,7 @@ public class TwoBrokerQueueClientsReconn Message msg; int i; for (i = 0; i < msgCount; i++) { - msg = consumer.receive(1000); + msg = consumer.receive(4000); if (msg == null) { LOG.error("Consumer failed to receive exactly " + msgCount + " messages. Actual messages received is: " + i); break; @@ -348,6 +386,21 @@ public class TwoBrokerQueueClientsReconn return sess.createConsumer(dest); } + protected void configureBroker(BrokerService broker) { + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setEnableAudit(false); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + } + + protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception { + NetworkConnector nc = super.bridgeBrokers(localBroker,remoteBroker, dynamicOnly, networkTTL, conduit, failover); + nc.setPrefetchSize(NETWORK_PREFETCH); + nc.setDecreaseNetworkConsumerPriority(true); + return nc; + } + public void setUp() throws Exception { super.setAutoFail(true); super.setUp();