Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 052799FA5 for ; Fri, 17 May 2013 14:28:13 +0000 (UTC) Received: (qmail 30974 invoked by uid 500); 17 May 2013 14:28:13 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 30869 invoked by uid 500); 17 May 2013 14:28:12 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 30848 invoked by uid 99); 17 May 2013 14:28:12 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 May 2013 14:28:12 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 May 2013 14:28:10 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 16A182388962; Fri, 17 May 2013 14:27:49 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1483827 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/network/ activemq-client/src/main/java/org/apache/activemq/ activemq-client/src/main/java/org/apache/activemq/command/ activemq-unit-tests/src/test/java/org/ap... Date: Fri, 17 May 2013 14:27:48 -0000 To: commits@activemq.apache.org From: dejanb@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130517142749.16A182388962@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dejanb Date: Fri May 17 14:27:48 2013 New Revision: 1483827 URL: http://svn.apache.org/r1483827 Log: https://issues.apache.org/jira/browse/AMQ-4000 - initial implementation of keeping track of durable subscribers over network and unregister them appropriately Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=1483827&r1=1483826&r2=1483827&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java Fri May 17 14:27:48 2013 @@ -24,6 +24,7 @@ import java.util.List; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.transport.Transport; import org.slf4j.Logger; @@ -84,7 +85,11 @@ public class ConduitBridge extends Deman } // add the interest in the subscription if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) { - ds.add(info.getConsumerId()); + if (!info.isDurable()) { + ds.add(info.getConsumerId()); + } else { + ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); + } } matched = true; // continue - we want interest to any existing DemandSubscriptions Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1483827&r1=1483826&r2=1483827&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri May 17 14:27:48 2013 @@ -19,10 +19,7 @@ package org.apache.activemq.network; import java.io.IOException; import java.security.GeneralSecurityException; import java.security.cert.X509Certificate; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Properties; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -50,32 +47,7 @@ import org.apache.activemq.broker.region import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQTempDestination; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.BrokerId; -import org.apache.activemq.command.BrokerInfo; -import org.apache.activemq.command.Command; -import org.apache.activemq.command.ConnectionError; -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.DataStructure; -import org.apache.activemq.command.DestinationInfo; -import org.apache.activemq.command.ExceptionResponse; -import org.apache.activemq.command.KeepAliveInfo; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.NetworkBridgeFilter; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.RemoveInfo; -import org.apache.activemq.command.Response; -import org.apache.activemq.command.SessionInfo; -import org.apache.activemq.command.ShutdownInfo; -import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.command.*; import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.security.SecurityContext; @@ -147,7 +119,7 @@ public abstract class DemandForwardingBr private final AtomicBoolean started = new AtomicBoolean(); private TransportConnection duplexInitiatingConnection; - private BrokerService brokerService = null; + protected BrokerService brokerService = null; private ObjectName mbeanObjectName; private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor(); private Transport duplexInboundLocalBroker = null; @@ -818,6 +790,28 @@ public abstract class DemandForwardingBr } else if (data.getClass() == RemoveInfo.class) { ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); removeDemandSubscription(id); + } else if (data.getClass() == RemoveSubscriptionInfo.class) { + RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); + SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); + for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { + DemandSubscription ds = (DemandSubscription) i.next(); + boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo); + if (removed) { + if (ds.getDurableRemoteSubs().isEmpty()) { + + // deactivate subscriber + RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId()); + localBroker.oneway(removeInfo); + + // remove subscriber + RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); + sending.setClientId(localClientId); + sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName()); + sending.setConnectionId(this.localConnectionInfo.getConnectionId()); + localBroker.oneway(sending); + } + } + } } } @@ -1180,6 +1174,9 @@ public abstract class DemandForwardingBr if (duplicateSuppressionIsRequired(sub)) { undoMapRegistration(sub); } else { + if (consumerInfo.isDurable()) { + sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName())); + } addSubscription(sub); consumerAdded = true; } @@ -1274,7 +1271,7 @@ public abstract class DemandForwardingBr return found; } - private final Collection getRegionSubscriptions(ActiveMQDestination dest) { + protected final Collection getRegionSubscriptions(ActiveMQDestination dest) { RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker(); Region region; Collection subs; @@ -1370,8 +1367,7 @@ public abstract class DemandForwardingBr // may need to change if we ever subscribe to a remote broker. sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter()); } else { - // need to ack this message if it is ignored as it is durable so - // we check before we send. see: suppressMessageDispatch() + sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); } } Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=1483827&r1=1483826&r2=1483827&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java Fri May 17 14:27:48 2013 @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.Atomi import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.NetworkBridgeFilter; +import org.apache.activemq.command.SubscriptionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,8 @@ public class DemandSubscription { private final Set remoteSubsIds = new CopyOnWriteArraySet(); private final AtomicInteger dispatched = new AtomicInteger(0); private final AtomicBoolean activeWaiter = new AtomicBoolean(); + private final Set durableRemoteSubs = new CopyOnWriteArraySet(); + private SubscriptionInfo localDurableSubscriber; private NetworkBridgeFilter networkBridgeFilter; @@ -69,6 +72,10 @@ public class DemandSubscription { return remoteSubsIds.remove(id); } + public Set getDurableRemoteSubs() { + return durableRemoteSubs; + } + /** * @return true if there are no interested consumers */ @@ -138,4 +145,12 @@ public class DemandSubscription { public void setNetworkBridgeFilter(NetworkBridgeFilter networkBridgeFilter) { this.networkBridgeFilter = networkBridgeFilter; } + + public SubscriptionInfo getLocalDurableSubscriber() { + return localDurableSubscriber; + } + + public void setLocalDurableSubscriber(SubscriptionInfo localDurableSubscriber) { + this.localDurableSubscriber = localDurableSubscriber; + } } Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1483827&r1=1483826&r2=1483827&view=diff ============================================================================== --- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri May 17 14:27:48 2013 @@ -214,6 +214,7 @@ public class ActiveMQMessageConsumer imp this.info = new ConsumerInfo(consumerId); this.info.setExclusive(this.session.connection.isExclusiveConsumer()); + this.info.setClientId(this.session.connection.getClientID()); this.info.setSubscriptionName(name); this.info.setPrefetchSize(prefetch); this.info.setCurrentPrefetchSize(prefetch); Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java?rev=1483827&r1=1483826&r2=1483827&view=diff ============================================================================== --- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java (original) +++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java Fri May 17 14:27:48 2013 @@ -42,6 +42,7 @@ public class ConsumerInfo extends BaseCo protected boolean browser; protected boolean dispatchAsync; protected String selector; + protected String clientId; protected String subscriptionName; protected boolean noLocal; protected boolean exclusive; @@ -93,6 +94,7 @@ public class ConsumerInfo extends BaseCo info.browser = browser; info.dispatchAsync = dispatchAsync; info.selector = selector; + info.clientId = clientId; info.subscriptionName = subscriptionName; info.noLocal = noLocal; info.exclusive = exclusive; @@ -217,6 +219,19 @@ public class ConsumerInfo extends BaseCo } /** + * Used to identify the id of a client connection. + * + * @openwire:property version=1 + */ + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + /** * Used to identify the name of a durable subscription. * * @openwire:property version=1 Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java?rev=1483827&r1=1483826&r2=1483827&view=diff ============================================================================== --- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java (original) +++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java Fri May 17 14:27:48 2013 @@ -34,6 +34,13 @@ public class SubscriptionInfo implements protected String subscriptionName; protected String selector; + public SubscriptionInfo() {} + + public SubscriptionInfo(String clientId, String subscriptionName) { + this.clientId = clientId; + this.subscriptionName = subscriptionName; + } + public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; } Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java?rev=1483827&r1=1483826&r2=1483827&view=diff ============================================================================== --- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java (original) +++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java Fri May 17 14:27:48 2013 @@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory; */ public class DurableSubInBrokerNetworkTest extends NetworkTestSupport { - private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class); + private static final Logger LOG = LoggerFactory.getLogger(DurableSubInBrokerNetworkTest.class); // protected BrokerService localBroker; private final String subName = "Subscriber1"; private final String subName2 = "Subscriber2"; @@ -152,6 +152,16 @@ public class DurableSubInBrokerNetworkTe assertTrue("Durable subscription should still be on remote broker", foundSubInRemoteBrokerByTopicName(topicName)); + sub2.close(); + session.unsubscribe(subName2); + + Thread.sleep(100); + + assertFalse(foundSubInLocalBroker(subName2)); + + assertFalse("Durable subscription not unregistered on remote broker", + foundSubInRemoteBrokerByTopicName(topicName)); + } private boolean foundSubInRemoteBrokerByTopicName(String topicName) throws Exception {