activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r384492 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
Date Thu, 09 Mar 2006 11:22:38 GMT
Author: rajdavies
Date: Thu Mar  9 03:22:26 2006
New Revision: 384492

URL: http://svn.apache.org/viewcvs?rev=384492&view=rev
Log:
tidied up subscription objectNames

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=384492&r1=384491&r2=384492&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
Thu Mar  9 03:22:26 2006
@@ -22,7 +22,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
-
 import javax.management.InstanceNotFoundException;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -34,7 +33,6 @@
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
@@ -62,10 +60,8 @@
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
-
 public class ManagedRegionBroker extends RegionBroker{
     private static final Log log=LogFactory.getLog(ManagedRegionBroker.class);
     private final MBeanServer mbeanServer;
@@ -80,42 +76,37 @@
     private final Map inactiveDurableTopicSubscribers=new ConcurrentHashMap();
     private final Map temporaryQueueSubscribers=new ConcurrentHashMap();
     private final Map temporaryTopicSubscribers=new ConcurrentHashMap();
-    private final Map subscriptionKeys = new ConcurrentHashMap();
-    private final Map subscriptionMap = new ConcurrentHashMap();
-    private final Set registeredMBeans = new CopyOnWriteArraySet();
-    
+    private final Map subscriptionKeys=new ConcurrentHashMap();
+    private final Map subscriptionMap=new ConcurrentHashMap();
+    private final Set registeredMBeans=new CopyOnWriteArraySet();
     /* This is the first broker in the broker interceptor chain. */
     private Broker contextBroker;
 
     public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName
brokerObjectName,
-                    TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter
adapter) throws IOException{
+                    TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter
adapter)
+                    throws IOException{
         super(brokerService,taskRunnerFactory,memoryManager,adapter);
         this.mbeanServer=mbeanServer;
         this.brokerObjectName=brokerObjectName;
     }
-    
-    public void start() throws Exception {
+
+    public void start() throws Exception{
         super.start();
-        //build all existing durable subscriptions
+        // build all existing durable subscriptions
         buildExistingSubscriptions();
-        
     }
 
-    
-    protected void doStop(ServiceStopper stopper) {
+    protected void doStop(ServiceStopper stopper){
         super.doStop(stopper);
-        
         // lets remove any mbeans not yet removed
-        for (Iterator iter = registeredMBeans.iterator(); iter.hasNext();) {
-            ObjectName name = (ObjectName) iter.next();
-            try {
+        for(Iterator iter=registeredMBeans.iterator();iter.hasNext();){
+            ObjectName name=(ObjectName) iter.next();
+            try{
                 mbeanServer.unregisterMBean(name);
-            }
-            catch (InstanceNotFoundException e) {
-                log.warn("The MBean: " + name + " is no longer registered with JMX");
-            }
-            catch (Exception e) {
-                stopper.onException(this, e);
+            }catch(InstanceNotFoundException e){
+                log.warn("The MBean: "+name+" is no longer registered with JMX");
+            }catch(Exception e){
+                stopper.onException(this,e);
             }
         }
         registeredMBeans.clear();
@@ -141,12 +132,12 @@
 
     public void register(ActiveMQDestination destName,Destination destination){
         try{
-            ObjectName objectName = createObjectName(destName);
+            ObjectName objectName=createObjectName(destName);
             DestinationView view;
             if(destination instanceof Queue){
-                view=new QueueView(this, (Queue) destination);
+                view=new QueueView(this,(Queue) destination);
             }else{
-                view=new TopicView(this, (Topic) destination);
+                view=new TopicView(this,(Topic) destination);
             }
             registerDestination(objectName,destName,view);
         }catch(Exception e){
@@ -156,7 +147,7 @@
 
     public void unregister(ActiveMQDestination destName){
         try{
-            ObjectName objectName = createObjectName(destName);
+            ObjectName objectName=createObjectName(destName);
             unregisterDestination(objectName);
         }catch(Exception e){
             log.error("Failed to unregister "+destName,e);
@@ -164,32 +155,30 @@
     }
 
     public void registerSubscription(ConnectionContext context,Subscription sub){
-        SubscriptionKey key = new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName());
         Hashtable map=brokerObjectName.getKeyPropertyList();
-        String name = key.toString();
+        String name="";
+        SubscriptionKey key=new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName());
+        if(sub.getConsumerInfo().isDurable()){
+            name=key.toString();
+        }
+        if(sub.getConsumerInfo()!=null&&sub.getConsumerInfo().getConsumerId()!=null){
+            name+="."+sub.getConsumerInfo().getConsumerId();
+        }
         try{
-        	
-        	ObjectName objectName = new ObjectName(
-        			brokerObjectName.getDomain()+":"+
-            		"BrokerName="+map.get("BrokerName")+","+
-            		"Type=Subscription,"+
-            		"active=true,"+
-                    "name="+JMXSupport.encodeObjectNamePart(name)+""
-            		);
-
+            ObjectName objectName=new ObjectName(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")
+                            +","+"Type=Subscription,"+"active=true,"+"name="+JMXSupport.encodeObjectNamePart(name)+"");
             SubscriptionView view;
             if(sub.getConsumerInfo().isDurable()){
                 view=new DurableSubscriptionView(this,context.getClientId(),sub);
             }else{
-                if (sub instanceof TopicSubscription) {
-                    view = new TopicSubscriptionView(context.getClientId(),(TopicSubscription)
sub);
-                }
-                else {
+                if(sub instanceof TopicSubscription){
+                    view=new TopicSubscriptionView(context.getClientId(),(TopicSubscription)
sub);
+                }else{
                     view=new SubscriptionView(context.getClientId(),sub);
                 }
             }
-            subscriptionMap.put(sub,objectName);
             registerSubscription(objectName,sub.getConsumerInfo(),key,view);
+            subscriptionMap.put(sub,objectName);
         }catch(Exception e){
             log.error("Failed to register subscription "+sub,e);
         }
@@ -233,7 +222,8 @@
         mbeanServer.unregisterMBean(key);
     }
 
-    protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionKey
subscriptionKey,SubscriptionView view) throws Exception{
+    protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionKey
subscriptionKey,
+                    SubscriptionView view) throws Exception{
         ActiveMQDestination dest=info.getDestination();
         if(dest.isQueue()){
             if(dest.isTemporary()){
@@ -247,16 +237,16 @@
             }else{
                 if(info.isDurable()){
                     durableTopicSubscribers.put(key,view);
-                    //unregister any inactive durable subs
-                    try {
-                        ObjectName inactiveName = (ObjectName) subscriptionKeys.get(subscriptionKey);
-                        if (inactiveName != null){
+                    // unregister any inactive durable subs
+                    try{
+                        ObjectName inactiveName=(ObjectName) subscriptionKeys.get(subscriptionKey);
+                        if(inactiveName!=null){
                             inactiveDurableTopicSubscribers.remove(inactiveName);
                             registeredMBeans.remove(inactiveName);
                             mbeanServer.unregisterMBean(inactiveName);
                         }
                     }catch(Exception e){
-                        log.error("Unable to unregister inactive durable subscriber: " +
subscriptionKey,e);
+                        log.error("Unable to unregister inactive durable subscriber: "+subscriptionKey,e);
                     }
                 }else{
                     topicSubscribers.put(key,view);
@@ -275,72 +265,64 @@
         temporaryTopicSubscribers.remove(key);
         registeredMBeans.remove(key);
         mbeanServer.unregisterMBean(key);
-        DurableSubscriptionView view = (DurableSubscriptionView) durableTopicSubscribers.remove(key);
-        if (view != null){
-            //need to put this back in the inactive list
-            SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(),view.getSubscriptionName());
-            SubscriptionInfo info = new SubscriptionInfo();
+        DurableSubscriptionView view=(DurableSubscriptionView) durableTopicSubscribers.remove(key);
+        if(view!=null){
+            // need to put this back in the inactive list
+            SubscriptionKey subscriptionKey=new SubscriptionKey(view.getClientId(),view.getSubscriptionName());
+            SubscriptionInfo info=new SubscriptionInfo();
             info.setClientId(subscriptionKey.getClientId());
             info.setSubcriptionName(subscriptionKey.getSubscriptionName());
             info.setDestination(new ActiveMQTopic(view.getDestinationName()));
-            addInactiveSubscription(subscriptionKey, info);
+            addInactiveSubscription(subscriptionKey,info);
         }
-        
-       
     }
-    
+
     protected void buildExistingSubscriptions() throws Exception{
-        Map subscriptions = new HashMap();
-        Set destinations = adaptor.getDestinations();
-        if (destinations != null){
-            for (Iterator iter = destinations.iterator(); iter.hasNext();){
-                ActiveMQDestination dest = (ActiveMQDestination) iter.next();
-                if (dest.isTopic()){
-                    TopicMessageStore store = adaptor.createTopicMessageStore((ActiveMQTopic)
dest);
-                    SubscriptionInfo[] infos = store.getAllSubscriptions();
-                    if (infos != null){
-                        for (int i = 0; i < infos.length; i++) {
-                            
-                            SubscriptionInfo info = infos[i];
+        Map subscriptions=new HashMap();
+        Set destinations=adaptor.getDestinations();
+        if(destinations!=null){
+            for(Iterator iter=destinations.iterator();iter.hasNext();){
+                ActiveMQDestination dest=(ActiveMQDestination) iter.next();
+                if(dest.isTopic()){
+                    TopicMessageStore store=adaptor.createTopicMessageStore((ActiveMQTopic)
dest);
+                    SubscriptionInfo[] infos=store.getAllSubscriptions();
+                    if(infos!=null){
+                        for(int i=0;i<infos.length;i++){
+                            SubscriptionInfo info=infos[i];
                             log.debug("Restoring durable subscription: "+infos);
-                            SubscriptionKey key = new SubscriptionKey(info);
+                            SubscriptionKey key=new SubscriptionKey(info);
                             subscriptions.put(key,info);
-                        }   
+                        }
                     }
                 }
             }
         }
-        for (Iterator i = subscriptions.entrySet().iterator();i.hasNext();){
-            Map.Entry entry = (Entry) i.next();
-            SubscriptionKey key = (SubscriptionKey) entry.getKey();
-            SubscriptionInfo info = (SubscriptionInfo) entry.getValue();
-            addInactiveSubscription(key, info);
+        for(Iterator i=subscriptions.entrySet().iterator();i.hasNext();){
+            Map.Entry entry=(Entry) i.next();
+            SubscriptionKey key=(SubscriptionKey) entry.getKey();
+            SubscriptionInfo info=(SubscriptionInfo) entry.getValue();
+            addInactiveSubscription(key,info);
         }
     }
-    
+
     protected void addInactiveSubscription(SubscriptionKey key,SubscriptionInfo info){
         Hashtable map=brokerObjectName.getKeyPropertyList();
         try{
-        	ObjectName objectName = new ObjectName(
-        			brokerObjectName.getDomain()+":"+
-            		"BrokerName="+map.get("BrokerName")+","+
-            		"Type=Subscription,"+
-            		"active=false,"+
-                    "name="+JMXSupport.encodeObjectNamePart(key.toString())+""
-            		);
-
-            SubscriptionView view = new InactiveDurableSubscriptionView(this,key.getClientId(),info);
+            ObjectName objectName=new ObjectName(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")
+                            +","+"Type=Subscription,"+"active=false,"+"name="
+                            +JMXSupport.encodeObjectNamePart(key.toString())+"");
+            SubscriptionView view=new InactiveDurableSubscriptionView(this,key.getClientId(),info);
             registeredMBeans.add(objectName);
             mbeanServer.registerMBean(view,objectName);
             inactiveDurableTopicSubscribers.put(objectName,view);
-            subscriptionKeys.put(key, objectName);
+            subscriptionKeys.put(key,objectName);
         }catch(Exception e){
             log.error("Failed to register subscription "+info,e);
         }
     }
-    
+
     public CompositeData[] browse(SubscriptionView view) throws OpenDataException{
-        List messages = getSubscriberMessages(view);
+        List messages=getSubscriberMessages(view);
         CompositeData c[]=new CompositeData[messages.size()];
         for(int i=0;i<c.length;i++){
             try{
@@ -354,7 +336,7 @@
 
     public TabularData browseAsTable(SubscriptionView view) throws OpenDataException{
         OpenTypeFactory factory=OpenTypeSupport.getFactory(ActiveMQMessage.class);
-        List messages = getSubscriberMessages(view);
+        List messages=getSubscriberMessages(view);
         CompositeType ct=factory.getCompositeType();
         TabularType tt=new TabularType("MessageList","MessageList",ct,new String[] { "JMSMessageID"
});
         TabularDataSupport rc=new TabularDataSupport(tt);
@@ -363,14 +345,12 @@
         }
         return rc;
     }
-    
+
     protected List getSubscriberMessages(SubscriptionView view){
-        final List result = new ArrayList();
-        try {
-        ActiveMQTopic  topic = new ActiveMQTopic(view.getDestinationName());
-        TopicMessageStore store = adaptor.createTopicMessageStore(topic);
-       
-       
+        final List result=new ArrayList();
+        try{
+            ActiveMQTopic topic=new ActiveMQTopic(view.getDestinationName());
+            TopicMessageStore store=adaptor.createTopicMessageStore(topic);
             store.recover(new MessageRecoveryListener(){
                 public void recoverMessage(Message message) throws Exception{
                     result.add(message);
@@ -381,71 +361,75 @@
                 public void finished(){}
             });
         }catch(Throwable e){
-            log.error("Failed to browse messages for Subscription " + view,e);
+            log.error("Failed to browse messages for Subscription "+view,e);
         }
         return result;
     }
-    
-    protected  ObjectName[] getTopics(){
-        Set set = topics.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+
+    protected ObjectName[] getTopics(){
+        Set set=topics.keySet();
+        return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
     }
+
     protected ObjectName[] getQueues(){
-        Set set = queues.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set set=queues.keySet();
+        return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
     }
+
     protected ObjectName[] getTemporaryTopics(){
-        Set set = temporaryTopics.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set set=temporaryTopics.keySet();
+        return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
     }
+
     protected ObjectName[] getTemporaryQueues(){
-        Set set = temporaryQueues.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set set=temporaryQueues.keySet();
+        return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
     }
-    
+
     protected ObjectName[] getTopicSubscribers(){
-        Set set = topicSubscribers.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set set=topicSubscribers.keySet();
+        return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
     }
+
     protected ObjectName[] getDurableTopicSubscribers(){
-        Set set = durableTopicSubscribers.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set set=durableTopicSubscribers.keySet();
+        return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
     }
+
     protected ObjectName[] getQueueSubscribers(){
-        Set set = queueSubscribers.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set set=queueSubscribers.keySet();
+        return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
     }
+
     protected ObjectName[] getTemporaryTopicSubscribers(){
-        Set set = temporaryTopicSubscribers.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set set=temporaryTopicSubscribers.keySet();
+        return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
     }
+
     protected ObjectName[] getTemporaryQueueSubscribers(){
-        Set set = temporaryQueueSubscribers.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set set=temporaryQueueSubscribers.keySet();
+        return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
     }
-    
+
     protected ObjectName[] getInactiveDurableTopicSubscribers(){
-        Set set = inactiveDurableTopicSubscribers.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set set=inactiveDurableTopicSubscribers.keySet();
+        return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
     }
 
-    public Broker getContextBroker() {
+    public Broker getContextBroker(){
         return contextBroker;
     }
 
-    public void setContextBroker(Broker contextBroker) {
-        this.contextBroker = contextBroker;
+    public void setContextBroker(Broker contextBroker){
+        this.contextBroker=contextBroker;
     }
 
-    protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException
{
+    protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException{
         // Build the object name for the destination
         Hashtable map=brokerObjectName.getKeyPropertyList();
-        ObjectName objectName = new ObjectName(
-                brokerObjectName.getDomain()+":"+
-                "BrokerName="+map.get("BrokerName")+","+
-                "Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+
-                "Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName())
-                );
+        ObjectName objectName=new ObjectName(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")+","
+                        +"Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","
+                        +"Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
         return objectName;
     }
 }



Mime
View raw message