activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r381927 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/ broker/jmx/ broker/region/ network/jms/
Date Wed, 01 Mar 2006 06:34:47 GMT
Author: rajdavies
Date: Tue Feb 28 22:34:44 2006
New Revision: 381927

URL: http://svn.apache.org/viewcvs?rev=381927&view=rev
Log:
Added mbeans for Subscribers

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=381927&r1=381926&r2=381927&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Tue Feb 28 22:34:44 2006
@@ -866,7 +866,7 @@
 
         if (isUseJmx()) {
             ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
-            BrokerViewMBean view = new BrokerView(broker, managedBroker.getDestinationStatistics(),
getMemoryManager());
+            BrokerViewMBean view = new BrokerView(managedBroker, getMemoryManager());
             MBeanServer mbeanServer = getManagementContext().getMBeanServer();
             ObjectName objectName = getBrokerObjectName();
             mbeanServer.registerMBean(view, objectName);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=381927&r1=381926&r2=381927&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
Tue Feb 28 22:34:44 2006
@@ -16,19 +16,18 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import javax.management.ObjectName;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.memory.UsageManager;
 
 public class BrokerView implements BrokerViewMBean {
     
-    private final Broker broker;
-    private final DestinationStatistics destinationStatistics;
+    private final ManagedRegionBroker broker;
     private final UsageManager usageManager;
 
-    public BrokerView(Broker broker, DestinationStatistics destinationStatistics, UsageManager
usageManager) {
+    public BrokerView(ManagedRegionBroker broker, UsageManager usageManager) {
         this.broker = broker;
-        this.destinationStatistics = destinationStatistics;
         this.usageManager = usageManager;        
     }
     
@@ -49,19 +48,19 @@
     }
     
     public long getTotalEnqueueCount() {
-        return destinationStatistics.getEnqueues().getCount();    
+        return broker.getDestinationStatistics().getEnqueues().getCount();    
     }
     public long getTotalDequeueCount() {
-        return destinationStatistics.getDequeues().getCount();
+        return broker.getDestinationStatistics().getDequeues().getCount();
     }
     public long getTotalConsumerCount() {
-        return destinationStatistics.getConsumers().getCount();
+        return broker.getDestinationStatistics().getConsumers().getCount();
     }
     public long getTotalMessages() {
-        return destinationStatistics.getMessages().getCount();
+        return broker.getDestinationStatistics().getMessages().getCount();
     }    
     public long getTotalMessagesCached() {
-        return destinationStatistics.getMessagesCached().getCount();
+        return broker.getDestinationStatistics().getMessagesCached().getCount();
     }
 
     public int getMemoryPercentageUsed() {
@@ -75,11 +74,47 @@
     }
     
     public void resetStatistics() {
-        destinationStatistics.reset();
+        broker.getDestinationStatistics().reset();
     }
 
     public void terminateJVM(int exitCode) {
         System.exit(exitCode);
+    }
+
+    public ObjectName[] getTopics(){
+        return broker.getTopics();
+    }
+
+    public ObjectName[] getQueues(){
+        return broker.getQueues();
+    }
+
+    public ObjectName[] getTemporaryTopics(){
+        return broker.getTemporaryTopics();
+    }
+
+    public ObjectName[] getTemporaryQueues(){
+        return broker.getTemporaryQueues();
+    }
+
+    public ObjectName[] getTopicSubscribers(){
+      return broker.getTemporaryTopicSubscribers();
+    }
+
+    public ObjectName[] getDurableTopicSubscribers(){
+        return broker.getDurableTopicSubscribers();
+    }
+
+    public ObjectName[] getQueueSubscribers(){
+       return broker.getQueueSubscribers();
+    }
+
+    public ObjectName[] getTemporaryTopicSubscribers(){
+        return broker.getTemporaryTopicSubscribers();
+    }
+
+    public ObjectName[] getTemporaryQueueSubscribers(){
+        return broker.getTemporaryQueueSubscribers();
     }
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=381927&r1=381926&r2=381927&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
Tue Feb 28 22:34:44 2006
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import javax.management.ObjectName;
 import org.apache.activemq.Service;
 
 public interface BrokerViewMBean extends Service {
@@ -36,5 +37,17 @@
     public void setMemoryLimit(long limit);
 
     public void terminateJVM(int exitCode);
+    
+    public ObjectName[] getTopics();
+    public ObjectName[] getQueues();
+    public ObjectName[] getTemporaryTopics();
+    public ObjectName[] getTemporaryQueues();
+    
+    public ObjectName[] getTopicSubscribers();
+    public ObjectName[] getDurableTopicSubscribers();
+    public ObjectName[] getQueueSubscribers();
+    public ObjectName[] getTemporaryTopicSubscribers();
+    public ObjectName[] getTemporaryQueueSubscribers();
+    
     
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java?rev=381927&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
Tue Feb 28 22:34:44 2006
@@ -0,0 +1,56 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+import org.apache.activemq.broker.region.Subscription;
+/**
+ * @version $Revision: 1.5 $
+ */
+public class DurableSubscriptionView extends SubscriptionView implements  DurableSubscriptionViewMBean
{
+    
+    protected String subscriptionName;
+    public DurableSubscriptionView(Subscription sub){
+        super(sub);
+        this.subscriptionName = sub.getConsumerInfo().getSubcriptionName();
+    }
+    /**
+     * @return name of the durable consumer
+     */
+    public String getSubscriptionName(){
+        return subscriptionName;
+    }
+
+    /**
+     * Browse messages for this durable subscriber
+     * 
+     * @return messages
+     * @throws OpenDataException
+     */
+    public CompositeData[] browse() throws OpenDataException{
+        return null;
+    }
+
+    /**
+     * Browse messages for this durable subscriber
+     * 
+     * @return messages
+     * @throws OpenDataException
+     */
+    public TabularData browseAsTable() throws OpenDataException{
+        return null;
+    }
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java?rev=381927&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java
Tue Feb 28 22:34:44 2006
@@ -0,0 +1,43 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+/**
+ * @version $Revision: 1.5 $
+ */
+public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean{
+    /**
+     * @return name of the durable consumer
+     */
+    public String getSubscriptionName();
+
+    /**
+     * Browse messages for this durable subscriber
+     * 
+     * @return messages
+     * @throws OpenDataException
+     */
+    public CompositeData[] browse() throws OpenDataException;
+
+    /**
+     * Browse messages for this durable subscriber
+     * 
+     * @return messages
+     * @throws OpenDataException
+     */
+    public TabularData browseAsTable() throws OpenDataException;
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=381927&r1=381926&r2=381927&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
Tue Feb 28 22:34:44 2006
@@ -1,21 +1,24 @@
 /**
- *
+ * 
  * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.activemq.broker.jmx;
 
+import java.io.IOException;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Queue;
@@ -25,78 +28,207 @@
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.util.JMXSupport;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import java.io.IOException;
-import java.util.Hashtable;
-
-public class ManagedRegionBroker extends RegionBroker {
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+public class ManagedRegionBroker extends RegionBroker{
+    private static final Log log=LogFactory.getLog(ManagedRegionBroker.class);
     private final MBeanServer mbeanServer;
     private final ObjectName brokerObjectName;
+    private final Map topics=new ConcurrentHashMap();
+    private final Map queues=new ConcurrentHashMap();
+    private final Map temporaryQueues=new ConcurrentHashMap();
+    private final Map temporaryTopics=new ConcurrentHashMap();
+    private final Map queueSubscribers=new ConcurrentHashMap();
+    private final Map topicSubscribers=new ConcurrentHashMap();
+    private final Map durableTopicSubscribers=new ConcurrentHashMap();
+    private final Map temporaryQueueSubscribers=new ConcurrentHashMap();
+    private final Map temporaryTopicSubscribers=new ConcurrentHashMap();
 
-    public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer, ObjectName
brokerObjectName, TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter
adapter, PolicyMap policyMap) throws IOException {
-        super(brokerService,taskRunnerFactory, memoryManager, adapter, policyMap);
-        this.mbeanServer = mbeanServer;
-        this.brokerObjectName = brokerObjectName;
+    public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName
brokerObjectName,
+                    TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter
adapter,
+                    PolicyMap policyMap) throws IOException{
+        super(brokerService,taskRunnerFactory,memoryManager,adapter,policyMap);
+        this.mbeanServer=mbeanServer;
+        this.brokerObjectName=brokerObjectName;
     }
 
-    protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter adapter, PolicyMap policyMap) {
-        return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory,
adapter, policyMap);
+    protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
+                    PersistenceAdapter adapter,PolicyMap policyMap){
+        return new ManagedQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter,policyMap);
     }
-    
-    protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory
taskRunnerFactory) {
-        return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory);
+
+    protected Region createTempQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory){
+        return new ManagedTempQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory);
     }
-    
-    protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory
taskRunnerFactory) {
-        return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory);
+
+    protected Region createTempTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory){
+        return new ManagedTempTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory);
     }
-    
-    protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter adapter, PolicyMap policyMap) {
-        return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory,
adapter, policyMap);
+
+    protected Region createTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
+                    PersistenceAdapter adapter,PolicyMap policyMap){
+        return new ManagedTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter,policyMap);
     }
 
-    public void register(ActiveMQDestination destName, Destination destination) throws Throwable
{
-        
+    public void register(ActiveMQDestination destName,Destination destination){
         // Build the object name for the destination
-        Hashtable map = new Hashtable(brokerObjectName.getKeyPropertyList());
+        Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
         map.put("Type",JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()));
-        map.put("Destination", JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
-        ObjectName destObjectName= new ObjectName(brokerObjectName.getDomain(), map);
-        
-        Object view;
-        if( destination instanceof Queue ) {
-            view = new QueueView((Queue) destination);
-        } else {
-            view = new TopicView((Topic) destination);
-        }
-        
-        mbeanServer.registerMBean(view, destObjectName);        
-        
+        map.put("Destination",JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
+        try{
+            ObjectName destObjectName=new ObjectName(brokerObjectName.getDomain(),map);
+            DestinationView view;
+            if(destination instanceof Queue){
+                view=new QueueView((Queue) destination);
+            }else{
+                view=new TopicView((Topic) destination);
+            }
+            registerDestination(destObjectName,destName,view);
+        }catch(Exception e){
+            log.error("Failed to register destination "+destName,e);
+        }
     }
 
-    public void unregister(ActiveMQDestination destName) throws Throwable {
+    public void unregister(ActiveMQDestination destName){
         // Build the object name for the destination
-        Hashtable map = new Hashtable(brokerObjectName.getKeyPropertyList());
+        Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
         map.put("Type",JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()));
-        map.put("Destination", JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
-        ObjectName destObjectName= new ObjectName(brokerObjectName.getDomain(), map);
-        
-        mbeanServer.unregisterMBean(destObjectName);        
+        map.put("Destination",JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
+        try{
+            ObjectName destObjectName=new ObjectName(brokerObjectName.getDomain(),map);
+            unregisterDestination(destObjectName);
+        }catch(Exception e){
+            log.error("Failed to unregister "+destName,e);
+        }
     }
 
-    public void registerSubscription(Subscription sub) {
-        // TODO: Use this to expose subscriptions to the JMX bus for management
+    public void registerSubscription(Subscription sub){
+        Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
+        map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
+        map.put("name",JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().toString()));
+        try{
+            ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
+            SubscriptionView view;
+            if(sub.getConsumerInfo().isDurable()){
+                view=new DurableSubscriptionView(sub);
+            }else{
+                view=new SubscriptionView(sub);
+            }
+            registerSubscription(objectName,sub.getConsumerInfo(),view);
+        }catch(Exception e){
+            log.error("Failed to register subscription "+sub,e);
+        }
     }
 
-    public void unregisterSubscription(Subscription sub) {
-        // TODO: Use this to expose subscriptions to the JMX bus for management
+    public void unregisterSubscription(Subscription sub){
+        Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
+        map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
+        map.put("name",JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().toString()));
+        try{
+            ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
+            unregisterSubscription(objectName);
+        }catch(Exception e){
+            log.error("Failed to unregister subscription "+sub,e);
+        }
+    }
+
+    protected void registerDestination(ObjectName key,ActiveMQDestination dest,DestinationView
view) throws Exception{
+        if(dest.isQueue()){
+            if(dest.isTemporary()){
+                temporaryQueues.put(key,view);
+            }else{
+                queues.put(key,view);
+            }
+        }else{
+            if(dest.isTemporary()){
+                temporaryTopics.put(key,view);
+            }else{
+                topics.put(key,view);
+            }
+        }
+        mbeanServer.registerMBean(view,key);
+    }
+
+    protected void unregisterDestination(ObjectName key) throws Exception{
+        topics.remove(key);
+        queues.remove(key);
+        temporaryQueues.remove(key);
+        temporaryTopics.remove(key);
+        mbeanServer.unregisterMBean(key);
+    }
+
+    protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionView
view) throws Exception{
+        ActiveMQDestination dest=info.getDestination();
+        if(dest.isQueue()){
+            if(dest.isTemporary()){
+                temporaryQueueSubscribers.put(key,view);
+            }else{
+                queueSubscribers.put(key,view);
+            }
+        }else{
+            if(dest.isTemporary()){
+                temporaryTopicSubscribers.put(key,view);
+            }else{
+                if(info.isDurable()){
+                    durableTopicSubscribers.put(key,view);
+                }else{
+                    topicSubscribers.put(key,view);
+                }
+            }
+        }
+        mbeanServer.registerMBean(view,key);
+    }
+
+    protected void unregisterSubscription(ObjectName key) throws Exception{
+        queueSubscribers.remove(key);
+        topicSubscribers.remove(key);
+        durableTopicSubscribers.remove(key);
+        temporaryQueueSubscribers.remove(key);
+        temporaryTopicSubscribers.remove(key);
+        mbeanServer.unregisterMBean(key);
+    }
+    
+    protected  ObjectName[] getTopics(){
+        Set set = topics.keySet();
+        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+    }
+    protected ObjectName[] getQueues(){
+        Set set = queues.keySet();
+        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+    }
+    protected ObjectName[] getTemporaryTopics(){
+        Set set = temporaryTopics.keySet();
+        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+    }
+    protected ObjectName[] getTemporaryQueues(){
+        Set set = temporaryQueues.keySet();
+        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+    }
+    
+    protected ObjectName[] getTopicSubscribers(){
+        Set set = topicSubscribers.keySet();
+        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+    }
+    protected ObjectName[] getDurableTopicSubscribers(){
+        Set set = durableTopicSubscribers.keySet();
+        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+    }
+    protected ObjectName[] getQueueSubscribers(){
+        Set set = queueSubscribers.keySet();
+        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+    }
+    protected ObjectName[] getTemporaryTopicSubscribers(){
+        Set set = temporaryTopicSubscribers.keySet();
+        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+    }
+    protected ObjectName[] getTemporaryQueueSubscribers(){
+        Set set = temporaryQueueSubscribers.keySet();
+        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java?rev=381927&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
Tue Feb 28 22:34:44 2006
@@ -0,0 +1,154 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
+
+
+
+/**
+ * @version $Revision: 1.5 $
+ */
+public class SubscriptionView implements SubscriptionViewMBean {
+    
+    
+    protected final Subscription subscription;
+    
+    
+    
+    /**
+     * Constructior
+     * @param subs
+     */
+    public SubscriptionView(Subscription subs){
+        this.subscription = subs;
+    }
+    
+    /**
+     * @return the id of the Connection the Subscription is on
+     */
+    public String getConnectionId(){
+        ConsumerInfo info = subscription.getConsumerInfo();
+        if (info != null){
+            return info.getConsumerId().getConnectionId();
+        }
+        return "NOTSET";
+    }
+
+    /**
+     * @return the id of the Session the subscription is on
+     */
+    public long getSessionId(){
+        ConsumerInfo info = subscription.getConsumerInfo();
+        if (info != null){
+            return info.getConsumerId().getSessionId();
+        }
+        return 0;
+    }
+
+    /**
+     * @return the id of the Subscription
+     */
+    public long getSubcriptionId(){
+        ConsumerInfo info = subscription.getConsumerInfo();
+        if (info != null){
+            return info.getConsumerId().getValue();
+        }
+        return 0;
+    }
+
+    /**
+     * @return the destination name
+     */
+    public String getDestinationName(){
+        ConsumerInfo info = subscription.getConsumerInfo();
+        if (info != null){
+            ActiveMQDestination dest = info.getDestination();
+            return dest.getPhysicalName();
+        }
+        return "NOTSET";
+       
+    }
+
+    /**
+     * @return true if the destination is a Queue
+     */
+    public boolean isDestinationQueue(){
+        ConsumerInfo info = subscription.getConsumerInfo();
+        if (info != null){
+            ActiveMQDestination dest = info.getDestination();
+            return dest.isQueue();
+        }
+        return false;
+    }
+
+    /**
+     * @return true of the destination is a Topic
+     */
+    public boolean isDestinationTopic(){
+        ConsumerInfo info = subscription.getConsumerInfo();
+        if (info != null){
+            ActiveMQDestination dest = info.getDestination();
+            return dest.isTopic();
+        }
+        return false;
+    }
+
+    /**
+     * @return true if the destination is temporary
+     */
+    public boolean isDestinationTemporary(){
+        ConsumerInfo info = subscription.getConsumerInfo();
+        if (info != null){
+            ActiveMQDestination dest = info.getDestination();
+            return dest.isTemporary();
+        }
+        return false;
+    }
+
+    /**
+     * The subscription should release as may references as it can to help the garbage collector
+     * reclaim memory.
+     */
+    public void gc(){
+        subscription.gc();
+    }
+    
+    /**
+     * @return number of messages pending delivery
+     */
+    public int getPending(){
+        return subscription.pending();
+    }
+    
+    /**
+     * @return number of messages dispatched
+     */
+    public int getDispatched(){
+        return subscription.dispatched();
+    }
+    
+    /**
+     * @return number of messages delivered
+     */
+    public int getDelivered(){
+        return subscription.delivered();
+    }
+
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java?rev=381927&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
Tue Feb 28 22:34:44 2006
@@ -0,0 +1,74 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+/**
+ * @version $Revision: 1.5 $
+ */
+public interface SubscriptionViewMBean{
+    /**
+     * @return the id of the Connection the Subscription is on
+     */
+    public String getConnectionId();
+
+    /**
+     * @return the id of the Session the subscription is on
+     */
+    public long getSessionId();
+
+    /**
+     * @return the id of the Subscription
+     */
+    public long getSubcriptionId();
+
+    /**
+     * @return the destination name
+     */
+    public String getDestinationName();
+
+    /**
+     * @return true if the destination is a Queue
+     */
+    public boolean isDestinationQueue();
+
+    /**
+     * @return true of the destination is a Topic
+     */
+    public boolean isDestinationTopic();
+
+    /**
+     * @return true if the destination is temporary
+     */
+    public boolean isDestinationTemporary();
+
+    /**
+     * The subscription should release as may references as it can to help the garbage collector
reclaim memory.
+     */
+    public void gc();
+
+    /**
+     * @return number of messages pending delivery
+     */
+    public int getPending();
+
+    /**
+     * @return number of messages dispatched
+     */
+    public int getDispatched();
+
+    /**
+     * @return number of messages delivered
+     */
+    public int getDelivered();
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=381927&r1=381926&r2=381927&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Tue Feb 28 22:34:44 2006
@@ -122,6 +122,16 @@
         super.add(node);
         node.decrementReferenceCount();
     }
+    
+    public int pending(){
+        if (active){
+            return super.pending();
+        }
+        //TODO: need to get from store
+        
+        return 0;
+    }
+    
 
     protected boolean canDispatch(MessageReference node) {
         return active;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=381927&r1=381926&r2=381927&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Feb 28 22:34:44 2006
@@ -225,6 +225,18 @@
     protected boolean isFull(){
         return dispatched.size()-delivered>=info.getPrefetchSize()||preLoadSize>preLoadLimit;
     }
+    
+    public int pending(){
+        return matched.size() - dispatched.size();
+    }
+    
+    public int dispatched(){
+        return dispatched.size();
+    }
+    
+    public int delivered(){
+        return delivered;
+    }
 
     protected void dispatchMatched() throws IOException{
         if(!dispatching){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=381927&r1=381926&r2=381927&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Tue Feb 28 22:34:44 2006
@@ -100,4 +100,19 @@
      */
     boolean isSlaveBroker();
     
+    /**
+     * @return number of messages pending delivery
+     */
+    int pending();
+    
+    /**
+     * @return number of messages dispatched
+     */
+    int dispatched();
+    
+    /**
+     * @return number of messages delivered
+     */
+    int delivered();
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=381927&r1=381926&r2=381927&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Tue Feb 28 22:34:44 2006
@@ -110,6 +110,18 @@
         throw new JMSException("Invalid acknowledgment: "+ack);
     }
     
+    public int pending(){
+        return matched.size() - dispatched;
+    }
+    
+    public int dispatched(){
+        return dispatched;
+    }
+    
+    public int delivered(){
+        return delivered;
+    }
+    
     private boolean isFull() {
         return dispatched-delivered >= info.getPrefetchSize();
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java?rev=381927&r1=381926&r2=381927&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
Tue Feb 28 22:34:44 2006
@@ -104,15 +104,15 @@
     }
 
     /**
-     * @return Returns the consumerName.
+     * @return Returns the subscriptionName.
      */
     public String getConsumerName(){
         return consumerName;
     }
 
     /**
-     * @param consumerName
-     *            The consumerName to set.
+     * @param subscriptionName
+     *            The subscriptionName to set.
      */
     public void setConsumerName(String consumerName){
         this.consumerName=consumerName;



Mime
View raw message