Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 64767 invoked from network); 2 Mar 2006 10:32:04 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 2 Mar 2006 10:32:03 -0000 Received: (qmail 99178 invoked by uid 500); 2 Mar 2006 10:32:39 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 99157 invoked by uid 500); 2 Mar 2006 10:32:39 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 99138 invoked by uid 99); 2 Mar 2006 10:32:39 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Mar 2006 02:32:39 -0800 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 02 Mar 2006 02:32:37 -0800 Received: (qmail 64314 invoked by uid 65534); 2 Mar 2006 10:31:30 -0000 Message-ID: <20060302103130.64313.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r382344 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: jmx/ region/ Date: Thu, 02 Mar 2006 10:31:26 -0000 To: activemq-commits@geronimo.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.7 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: rajdavies Date: Thu Mar 2 02:31:23 2006 New Revision: 382344 URL: http://svn.apache.org/viewcvs?rev=382344&view=rev Log: Added support for view inactive durable consumers Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java (with props) Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=382344&r1=382343&r2=382344&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Thu Mar 2 02:31:23 2006 @@ -117,4 +117,8 @@ return broker.getTemporaryQueueSubscribers(); } + public ObjectName[] getInactiveDurableTopicSubscribers(){ + return broker.getInactiveDurableTopicSubscribers(); + } + } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=382344&r1=382343&r2=382344&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Thu Mar 2 02:31:23 2006 @@ -45,6 +45,7 @@ public ObjectName[] getTopicSubscribers(); public ObjectName[] getDurableTopicSubscribers(); + public ObjectName[] getInactiveDurableTopicSubscribers(); public ObjectName[] getQueueSubscribers(); public ObjectName[] getTemporaryTopicSubscribers(); public ObjectName[] getTemporaryQueueSubscribers(); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java?rev=382344&r1=382343&r2=382344&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java Thu Mar 2 02:31:23 2006 @@ -23,10 +23,16 @@ public class DurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean { protected String subscriptionName; - public DurableSubscriptionView(Subscription sub){ - super(sub); + /** + * Constructor + * @param clientId + * @param sub + */ + public DurableSubscriptionView(String clientId,Subscription sub){ + super(clientId,sub); this.subscriptionName = sub.getConsumerInfo().getSubcriptionName(); } + /** * @return name of the durable consumer */ Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java?rev=382344&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java Thu Mar 2 02:31:23 2006 @@ -0,0 +1,102 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; +import org.apache.activemq.command.SubscriptionInfo; +/** + * @version $Revision: 1.5 $ + */ +public class InactiveDurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean { + + protected SubscriptionInfo info; + public InactiveDurableSubscriptionView(String clientId,SubscriptionInfo sub){ + super(clientId,null); + this.info = sub; + } + + + + + /** + * @return the id of the Subscription + */ + public long getSubcriptionId(){ + return -1; + } + + /** + * @return the destination name + */ + public String getDestinationName(){ + return info.getDestination().getPhysicalName(); + + } + + /** + * @return true if the destination is a Queue + */ + public boolean isDestinationQueue(){ + return false; + } + + /** + * @return true of the destination is a Topic + */ + public boolean isDestinationTopic(){ + return true; + } + + /** + * @return true if the destination is temporary + */ + public boolean isDestinationTemporary(){ + return false; + } + /** + * @return name of the durable consumer + */ + public String getSubscriptionName(){ + return info.getSubcriptionName(); + } + + /** + * @return true if the subscriber is active + */ + public boolean isActive(){ + return false; + } + + /** + * Browse messages for this durable subscriber + * + * @return messages + * @throws OpenDataException + */ + public CompositeData[] browse() throws OpenDataException{ + return null; + } + + /** + * Browse messages for this durable subscriber + * + * @return messages + * @throws OpenDataException + */ + public TabularData browseAsTable() throws OpenDataException{ + return null; + } +} \ No newline at end of file Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java ------------------------------------------------------------------------------ svn:executable = * Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java?rev=382344&r1=382343&r2=382344&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java Thu Mar 2 02:31:23 2006 @@ -41,7 +41,7 @@ protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { Subscription sub = super.createSubscription(context, info); - regionBroker.registerSubscription(sub); + regionBroker.registerSubscription(context,sub); return sub; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=382344&r1=382343&r2=382344&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Thu Mar 2 02:31:23 2006 @@ -14,13 +14,19 @@ package org.apache.activemq.broker.jmx; import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; import java.util.Hashtable; +import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.Map.Entry; import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Region; import org.apache.activemq.broker.region.RegionBroker; @@ -28,11 +34,15 @@ import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.util.JMXSupport; +import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; @@ -47,8 +57,11 @@ private final Map queueSubscribers=new ConcurrentHashMap(); private final Map topicSubscribers=new ConcurrentHashMap(); private final Map durableTopicSubscribers=new ConcurrentHashMap(); + private final Map inactiveDurableTopicSubscribers=new ConcurrentHashMap(); private final Map temporaryQueueSubscribers=new ConcurrentHashMap(); private final Map temporaryTopicSubscribers=new ConcurrentHashMap(); + private final Map subscriptionKeys = new ConcurrentHashMap(); + private final Map subscriptionMap = new ConcurrentHashMap(); public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter, @@ -57,6 +70,13 @@ this.mbeanServer=mbeanServer; this.brokerObjectName=brokerObjectName; } + + public void start() throws Exception { + super.start(); + //build all existing durable subscriptions + buildExistingSubscriptions(); + + } protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter,PolicyMap policyMap){ @@ -108,33 +128,37 @@ } } - public void registerSubscription(Subscription sub){ + public void registerSubscription(ConnectionContext context,Subscription sub){ + // NEED CONTEXT TO GET CLIENT ID AND USE Subscription KEY!!! + SubscriptionKey key = new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName()); Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList()); map.put("Type",JMXSupport.encodeObjectNamePart("Subscription")); - map.put("name",JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().toString())); + String name = key.toString() + ":" + sub.getConsumerInfo().toString(); + map.put("name",JMXSupport.encodeObjectNamePart(name)); + map.put("active", "true"); try{ ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map); SubscriptionView view; if(sub.getConsumerInfo().isDurable()){ - view=new DurableSubscriptionView(sub); + view=new DurableSubscriptionView(context.getClientId(),sub); }else{ - view=new SubscriptionView(sub); + view=new SubscriptionView(context.getClientId(),sub); } - registerSubscription(objectName,sub.getConsumerInfo(),view); + subscriptionMap.put(sub,objectName); + registerSubscription(objectName,sub.getConsumerInfo(),key,view); }catch(Exception e){ log.error("Failed to register subscription "+sub,e); } } public void unregisterSubscription(Subscription sub){ - Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList()); - map.put("Type",JMXSupport.encodeObjectNamePart("Subscription")); - map.put("name",JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().toString())); - try{ - ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map); - unregisterSubscription(objectName); - }catch(Exception e){ - log.error("Failed to unregister subscription "+sub,e); + ObjectName name=(ObjectName) subscriptionMap.get(sub); + if(name!=null){ + try{ + unregisterSubscription(name); + }catch(Exception e){ + log.error("Failed to unregister subscription "+sub,e); + } } } @@ -163,7 +187,7 @@ mbeanServer.unregisterMBean(key); } - protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionView view) throws Exception{ + protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionKey subscriptionKey,SubscriptionView view) throws Exception{ ActiveMQDestination dest=info.getDestination(); if(dest.isQueue()){ if(dest.isTemporary()){ @@ -177,6 +201,16 @@ }else{ if(info.isDurable()){ durableTopicSubscribers.put(key,view); + //unregister any inactive durable subs + try { + ObjectName inactiveName = (ObjectName) subscriptionKeys.get(subscriptionKey); + if (inactiveName != null){ + inactiveDurableTopicSubscribers.remove(inactiveName); + mbeanServer.unregisterMBean(inactiveName); + } + }catch(Exception e){ + log.error("Unable to unregister inactive durable subscriber: " + subscriptionKey,e); + } }else{ topicSubscribers.put(key,view); } @@ -188,10 +222,67 @@ protected void unregisterSubscription(ObjectName key) throws Exception{ queueSubscribers.remove(key); topicSubscribers.remove(key); - durableTopicSubscribers.remove(key); + inactiveDurableTopicSubscribers.remove(key); temporaryQueueSubscribers.remove(key); temporaryTopicSubscribers.remove(key); mbeanServer.unregisterMBean(key); + DurableSubscriptionView view = (DurableSubscriptionView) durableTopicSubscribers.remove(key); + if (view != null){ + //need to put this back in the inactive list + SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(),view.getSubscriptionName()); + SubscriptionInfo info = new SubscriptionInfo(); + info.setClientId(subscriptionKey.getClientId()); + info.setSubcriptionName(subscriptionKey.getSubscriptionName()); + info.setDestination(new ActiveMQTopic(view.getDestinationName())); + addInactiveSubscription(subscriptionKey, info); + } + + + } + + protected void buildExistingSubscriptions() throws Exception{ + Map subscriptions = new HashMap(); + Set destinations = adaptor.getDestinations(); + if (destinations != null){ + for (Iterator iter = destinations.iterator(); iter.hasNext();){ + ActiveMQDestination dest = (ActiveMQDestination) iter.next(); + if (dest.isTopic()){ + TopicMessageStore store = adaptor.createTopicMessageStore((ActiveMQTopic) dest); + SubscriptionInfo[] infos = store.getAllSubscriptions(); + if (infos != null){ + for (int i = 0; i < infos.length; i++) { + + SubscriptionInfo info = infos[i]; + log.debug("Restoring durable subscription: "+infos); + SubscriptionKey key = new SubscriptionKey(info); + subscriptions.put(key,info); + } + } + } + } + } + for (Iterator i = subscriptions.entrySet().iterator();i.hasNext();){ + Map.Entry entry = (Entry) i.next(); + SubscriptionKey key = (SubscriptionKey) entry.getKey(); + SubscriptionInfo info = (SubscriptionInfo) entry.getValue(); + addInactiveSubscription(key, info); + } + } + + protected void addInactiveSubscription(SubscriptionKey key,SubscriptionInfo info){ + Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList()); + map.put("Type",JMXSupport.encodeObjectNamePart("Subscription")); + map.put("name",JMXSupport.encodeObjectNamePart(key.toString())); + map.put("active", "false"); + try{ + ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map); + SubscriptionView view = new InactiveDurableSubscriptionView(key.getClientId(),info); + mbeanServer.registerMBean(view,objectName); + inactiveDurableTopicSubscribers.put(objectName,view); + subscriptionKeys.put(key, objectName); + }catch(Exception e){ + log.error("Failed to register subscription "+info,e); + } } protected ObjectName[] getTopics(){ @@ -229,6 +320,11 @@ } protected ObjectName[] getTemporaryQueueSubscribers(){ Set set = temporaryQueueSubscribers.keySet(); + return (ObjectName[])set.toArray(new ObjectName[set.size()]); + } + + protected ObjectName[] getInactiveDurableTopicSubscribers(){ + Set set = inactiveDurableTopicSubscribers.keySet(); return (ObjectName[])set.toArray(new ObjectName[set.size()]); } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=382344&r1=382343&r2=382344&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java Thu Mar 2 02:31:23 2006 @@ -39,7 +39,7 @@ protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { Subscription sub = super.createSubscription(context, info); - regionBroker.registerSubscription(sub); + regionBroker.registerSubscription(context,sub); return sub; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java?rev=382344&r1=382343&r2=382344&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java Thu Mar 2 02:31:23 2006 @@ -39,7 +39,7 @@ protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { Subscription sub = super.createSubscription(context, info); - regionBroker.registerSubscription(sub); + regionBroker.registerSubscription(context,sub); return sub; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java?rev=382344&r1=382343&r2=382344&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java Thu Mar 2 02:31:23 2006 @@ -41,7 +41,7 @@ protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { Subscription sub = super.createSubscription(context, info); - regionBroker.registerSubscription(sub); + regionBroker.registerSubscription(context,sub); return sub; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java?rev=382344&r1=382343&r2=382344&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java Thu Mar 2 02:31:23 2006 @@ -29,22 +29,30 @@ protected final Subscription subscription; - + protected final String clientId; /** * Constructior * @param subs */ - public SubscriptionView(Subscription subs){ + public SubscriptionView(String clientId,Subscription subs){ + this.clientId = clientId; this.subscription = subs; } /** + * @return the clientId + */ + public String getClientId(){ + return clientId; + } + + /** * @return the id of the Connection the Subscription is on */ public String getConnectionId(){ - ConsumerInfo info = subscription.getConsumerInfo(); + ConsumerInfo info = getConsumerInfo(); if (info != null){ return info.getConsumerId().getConnectionId(); } @@ -55,7 +63,7 @@ * @return the id of the Session the subscription is on */ public long getSessionId(){ - ConsumerInfo info = subscription.getConsumerInfo(); + ConsumerInfo info = getConsumerInfo(); if (info != null){ return info.getConsumerId().getSessionId(); } @@ -66,7 +74,7 @@ * @return the id of the Subscription */ public long getSubcriptionId(){ - ConsumerInfo info = subscription.getConsumerInfo(); + ConsumerInfo info = getConsumerInfo(); if (info != null){ return info.getConsumerId().getValue(); } @@ -77,7 +85,7 @@ * @return the destination name */ public String getDestinationName(){ - ConsumerInfo info = subscription.getConsumerInfo(); + ConsumerInfo info = getConsumerInfo(); if (info != null){ ActiveMQDestination dest = info.getDestination(); return dest.getPhysicalName(); @@ -90,7 +98,7 @@ * @return true if the destination is a Queue */ public boolean isDestinationQueue(){ - ConsumerInfo info = subscription.getConsumerInfo(); + ConsumerInfo info = getConsumerInfo(); if (info != null){ ActiveMQDestination dest = info.getDestination(); return dest.isQueue(); @@ -102,7 +110,7 @@ * @return true of the destination is a Topic */ public boolean isDestinationTopic(){ - ConsumerInfo info = subscription.getConsumerInfo(); + ConsumerInfo info = getConsumerInfo(); if (info != null){ ActiveMQDestination dest = info.getDestination(); return dest.isTopic(); @@ -114,41 +122,54 @@ * @return true if the destination is temporary */ public boolean isDestinationTemporary(){ - ConsumerInfo info = subscription.getConsumerInfo(); + ConsumerInfo info = getConsumerInfo(); if (info != null){ ActiveMQDestination dest = info.getDestination(); return dest.isTemporary(); } return false; } + + /** + * @return true if the subscriber is active + */ + public boolean isActive(){ + return true; + } /** * The subscription should release as may references as it can to help the garbage collector * reclaim memory. */ public void gc(){ + if (subscription != null){ subscription.gc(); + } } /** * @return number of messages pending delivery */ public int getPending(){ - return subscription.pending(); + return subscription != null ? subscription.pending() : 0; } /** * @return number of messages dispatched */ public int getDispatched(){ - return subscription.dispatched(); + return subscription != null ? subscription.dispatched() : 0; } /** * @return number of messages delivered */ public int getDelivered(){ - return subscription.delivered(); + return subscription != null ? subscription.delivered() : 0; + } + + protected ConsumerInfo getConsumerInfo(){ + return subscription != null ? subscription.getConsumerInfo() : null; } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java?rev=382344&r1=382343&r2=382344&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java Thu Mar 2 02:31:23 2006 @@ -17,6 +17,11 @@ * @version $Revision: 1.5 $ */ public interface SubscriptionViewMBean{ + + /** + * @return the clientId + */ + public String getClientId(); /** * @return the id of the Connection the Subscription is on */ @@ -51,6 +56,11 @@ * @return true if the destination is temporary */ public boolean isDestinationTemporary(); + + /** + * @return true if the subscriber is active + */ + public boolean isActive(); /** * The subscription should release as may references as it can to help the garbage collector reclaim memory. Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=382344&r1=382343&r2=382344&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Mar 2 02:31:23 2006 @@ -81,7 +81,7 @@ private BrokerId brokerId; private String brokerName; private Map clientIdSet = new HashMap(); // we will synchronize access - private PersistenceAdapter adaptor; + protected PersistenceAdapter adaptor; public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException { this(brokerService,taskRunnerFactory, memoryManager, createDefaultPersistenceAdapter(memoryManager), null);