Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 11822 invoked from network); 28 Mar 2006 22:11:00 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 28 Mar 2006 22:11:00 -0000 Received: (qmail 11207 invoked by uid 500); 28 Mar 2006 22:11:00 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 11169 invoked by uid 500); 28 Mar 2006 22:11:00 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 11160 invoked by uid 99); 28 Mar 2006 22:11:00 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Mar 2006 14:11:00 -0800 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 28 Mar 2006 14:10:59 -0800 Received: (qmail 11689 invoked by uid 65534); 28 Mar 2006 22:10:37 -0000 Message-ID: <20060328221037.11688.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r389614 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: BrokerService.java region/DurableTopicSubscription.java region/RegionBroker.java region/Topic.java region/TopicRegion.java Date: Tue, 28 Mar 2006 22:10:36 -0000 To: activemq-commits@geronimo.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.7 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: chirino Date: Tue Mar 28 14:10:34 2006 New Revision: 389614 URL: http://svn.apache.org/viewcvs?rev=389614&view=rev Log: Working on https://issues.apache.org/activemq/browse/AMQ-669 Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=389614&r1=389613&r2=389614&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Tue Mar 28 14:10:34 2006 @@ -116,6 +116,8 @@ private AtomicBoolean started = new AtomicBoolean(false); private BrokerPlugin[] plugins; + private boolean keepDurableSubsActive; + /** * Adds a new transport connector for the given bind address * @@ -908,6 +910,7 @@ else { regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter()); } + regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); regionBroker.setBrokerName(getBrokerName()); return regionBroker; } @@ -1120,5 +1123,13 @@ */ public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure){ this.shutdownOnMasterFailure=shutdownOnMasterFailure; + } + + public boolean isKeepDurableSubsActive() { + return keepDurableSubsActive; + } + + public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { + this.keepDurableSubsActive = keepDurableSubsActive; } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=389614&r1=389613&r2=389614&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Mar 28 14:10:34 2006 @@ -36,10 +36,12 @@ private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap(); private final ConcurrentHashMap destinations = new ConcurrentHashMap(); private final SubscriptionKey subscriptionKey; + private final boolean keepDurableSubsActive; private boolean active=false; - public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { + public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException { super(broker,context, info); + this.keepDurableSubsActive = keepDurableSubsActive; subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName()); } @@ -57,10 +59,13 @@ synchronized public void add(ConnectionContext context, Destination destination) throws Exception { super.add(context, destination); destinations.put(destination.getActiveMQDestination(), destination); - if( active ) { + if( active || keepDurableSubsActive ) { Topic topic = (Topic) destination; topic.activate(context, this); } + if( !isFull() ) { + dispatchMatched(); + } } synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Exception { @@ -68,21 +73,25 @@ this.active = true; this.context = context; this.info = info; - for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { - Topic topic = (Topic) iter.next(); - topic.activate(context, this); + if( !keepDurableSubsActive ) { + for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { + Topic topic = (Topic) iter.next(); + topic.activate(context, this); + } } - if( !isFull() ) { + if( !isFull() ) { dispatchMatched(); } } } - synchronized public void deactivate() throws Exception { + synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception { active=false; - for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { - Topic topic = (Topic) iter.next(); - topic.deactivate(context, this); + if( !keepDurableSubsActive ) { + for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { + Topic topic = (Topic) iter.next(); + topic.deactivate(context, this); + } } for (Iterator iter = dispatched.iterator(); iter.hasNext();) { @@ -115,7 +124,7 @@ } synchronized public void add(MessageReference node) throws Exception { - if( !active ) { + if( !active && !keepDurableSubsActive ) { return; } node = new IndirectMessageReference(node.getRegionDestination(), (Message) node); @@ -123,14 +132,14 @@ node.decrementReferenceCount(); } - public int getPendingQueueSize(){ - if (active){ + public int getPendingQueueSize() { + if( active || keepDurableSubsActive ) { return super.getPendingQueueSize(); } //TODO: need to get from store return 0; } - + public void setSelector(String selector) throws InvalidSelectorException { throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=389614&r1=389613&r2=389614&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Mar 28 14:10:34 2006 @@ -71,6 +71,7 @@ private final Region tempTopicRegion; private BrokerService brokerService; private boolean stopped = false; + private boolean keepDurableSubsActive=false; protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); @@ -125,6 +126,7 @@ public void start() throws Exception { + ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive); queueRegion.start(); topicRegion.start(); tempQueueRegion.start(); @@ -477,6 +479,14 @@ ss.stop(topicRegion); ss.stop(tempQueueRegion); ss.stop(tempTopicRegion); + } + + public boolean isKeepDurableSubsActive() { + return keepDurableSubsActive; + } + + public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { + this.keepDurableSubsActive = keepDurableSubsActive; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=389614&r1=389613&r2=389614&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Mar 28 14:10:34 2006 @@ -127,13 +127,7 @@ } sub.remove(context, this); } - - public void addInactiveSubscription(ConnectionContext context, DurableTopicSubscription sub) throws Exception { - sub.add(context, this); - destinationStatistics.getConsumers().increment(); - durableSubcribers.put(sub.getSubscriptionKey(), sub); - } - + public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException { if (store != null) { store.deleteSubscription(key.clientId, key.subscriptionName); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=389614&r1=389613&r2=389614&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Tue Mar 28 14:10:34 2006 @@ -16,30 +16,32 @@ */ package org.apache.activemq.broker.region; -import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import java.util.Iterator; +import java.util.Set; + +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; -import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; - -import java.util.Iterator; -import java.util.Set; +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; /** * @@ -47,9 +49,10 @@ */ public class TopicRegion extends AbstractRegion { private static final Log log = LogFactory.getLog(TopicRegion.class); - protected final ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap(); - + private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator(); + private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId()); + private boolean keepDurableSubsActive=false; public TopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) { @@ -116,7 +119,7 @@ SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName()); DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); if (sub != null) { - sub.deactivate(); + sub.deactivate(keepDurableSubsActive); } } @@ -166,24 +169,32 @@ // A single durable sub may be subscribing to multiple topics. so it might exist already. DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); - if( sub == null ) { - sub = (DurableTopicSubscription) createSubscription(context, createInactiveConsumerInfo(info)); + ConsumerInfo consumerInfo = createInactiveConsumerInfo(info); + if( sub == null ) { + sub = (DurableTopicSubscription) createSubscription(context, consumerInfo ); } - topic.addInactiveSubscription(context, sub); + + subscriptions.put(consumerInfo.getConsumerId(), sub); + topic.addSubscription(context, sub); } } return topic; } - private static ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) { + private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) { ConsumerInfo rc = new ConsumerInfo(); rc.setSelector(info.getSelector()); rc.setSubcriptionName(info.getSubcriptionName()); rc.setDestination(info.getDestination()); + rc.setConsumerId(createConsumerId()); return rc; } + private ConsumerId createConsumerId() { + return new ConsumerId(recoveredDurableSubSessionId,recoveredDurableSubIdGenerator.getNextSequenceId()); + } + protected void configureTopic(Topic topic, ActiveMQDestination destination) { if (broker.getDestinationPolicy() != null) { PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); @@ -198,7 +209,7 @@ SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName()); DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); if (sub == null) { - sub = new DurableTopicSubscription(broker,context, info); + sub = new DurableTopicSubscription(broker,context, info, keepDurableSubsActive); durableSubscriptions.put(key, sub); } else { @@ -239,6 +250,14 @@ iter.remove(); } return inactiveDestinations; + } + + public boolean isKeepDurableSubsActive() { + return keepDurableSubsActive; + } + + public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { + this.keepDurableSubsActive = keepDurableSubsActive; } }