activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r375654 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/jmx/ broker/region/ command/
Date Tue, 07 Feb 2006 18:34:28 GMT
Author: chirino
Date: Tue Feb  7 10:34:21 2006
New Revision: 375654

URL: http://svn.apache.org/viewcvs?rev=375654&view=rev
Log:
- Implemented http://jira.activemq.org/jira/browse/AMQ-511
  - Queues can now be browsed, messages deleted, or queue can be purged.
- Added initial hooks to expose subscriptions for JMX managment.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java
      - copied, changed from r375288, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java
      - copied, changed from r375288, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
Removed:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
Tue Feb  7 10:34:21 2006
@@ -16,12 +16,16 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import javax.jms.InvalidSelectorException;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.QueueRegion;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -33,6 +37,17 @@
     public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics,
UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter,
PolicyMap policyMap) {
         super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter,
policyMap);
         regionBroker = broker;
+    }
+
+    protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
+        Subscription sub = super.createSubscription(context, info);
+        regionBroker.registerSubscription(sub);
+        return sub;
+    }
+    
+    protected void destroySubscription(Subscription sub) {
+        regionBroker.unregisterSubscription(sub);
+        super.destroySubscription(sub);
     }
 
     protected Destination createDestination(ActiveMQDestination destination) throws Throwable
{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
Tue Feb  7 10:34:21 2006
@@ -18,8 +18,11 @@
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.Region;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.memory.UsageManager;
@@ -68,9 +71,15 @@
         map.put("Destination", JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
         ObjectName destObjectName= new ObjectName(brokerObjectName.getDomain(), map);
         
-        DestinationViewMBean view = new DestinationView(destination);
+        Object view;
+        if( destination instanceof Queue ) {
+            view = new QueueView((Queue) destination);
+        } else {
+            view = new TopicView((Topic) destination);
+        }
         
         mbeanServer.registerMBean(view, destObjectName);        
+        
     }
 
     public void unregister(ActiveMQDestination destName) throws Throwable {
@@ -81,5 +90,13 @@
         ObjectName destObjectName= new ObjectName(brokerObjectName.getDomain(), map);
         
         mbeanServer.unregisterMBean(destObjectName);        
+    }
+
+    public void registerSubscription(Subscription sub) {
+        // TODO: Use this to expose subscriptions to the JMX bus for management
+    }
+
+    public void unregisterSubscription(Subscription sub) {
+        // TODO: Use this to expose subscriptions to the JMX bus for management
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
Tue Feb  7 10:34:21 2006
@@ -16,23 +16,36 @@
  */
 package org.apache.activemq.broker.jmx;
 
-import org.apache.activemq.broker.Broker;
+import javax.jms.InvalidSelectorException;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TempQueueRegion;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.TaskRunnerFactory;
 
 public class ManagedTempQueueRegion extends TempQueueRegion {
 
     private final ManagedRegionBroker regionBroker;
-
     
     public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics
destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
         super(regionBroker,destinationStatistics, memoryManager, taskRunnerFactory);
         this.regionBroker = regionBroker;
+    }
+    
+    protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
+        Subscription sub = super.createSubscription(context, info);
+        regionBroker.registerSubscription(sub);
+        return sub;
+    }
+    
+    protected void destroySubscription(Subscription sub) {
+        regionBroker.unregisterSubscription(sub);
+        super.destroySubscription(sub);
     }
 
     protected Destination createDestination(ActiveMQDestination destination) throws Throwable
{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
Tue Feb  7 10:34:21 2006
@@ -16,12 +16,15 @@
  */
 package org.apache.activemq.broker.jmx;
 
-import org.apache.activemq.broker.Broker;
+import javax.jms.JMSException;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TempTopicRegion;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.TaskRunnerFactory;
 
@@ -32,6 +35,17 @@
     public ManagedTempTopicRegion(ManagedRegionBroker regionBroker, DestinationStatistics
destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
         super(regionBroker,destinationStatistics, memoryManager, taskRunnerFactory);
         this.regionBroker = regionBroker;
+    }
+    
+    protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
throws JMSException {
+        Subscription sub = super.createSubscription(context, info);
+        regionBroker.registerSubscription(sub);
+        return sub;
+    }
+    
+    protected void destroySubscription(Subscription sub) {
+        regionBroker.unregisterSubscription(sub);
+        super.destroySubscription(sub);
     }
 
     protected Destination createDestination(ActiveMQDestination destination) throws Throwable
{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
Tue Feb  7 10:34:21 2006
@@ -16,12 +16,16 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import javax.jms.JMSException;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -33,6 +37,17 @@
     public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics,
UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter,
PolicyMap policyMap) {
         super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter,
policyMap);
         regionBroker = broker;
+    }
+
+    protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
throws JMSException {
+        Subscription sub = super.createSubscription(context, info);
+        regionBroker.registerSubscription(sub);
+        return sub;
+    }
+    
+    protected void destroySubscription(Subscription sub) {
+        regionBroker.unregisterSubscription(sub);
+        super.destroySubscription(sub);
     }
 
     protected Destination createDestination(ActiveMQDestination destination) throws Throwable
{

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java?rev=375654&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
Tue Feb  7 10:34:21 2006
@@ -0,0 +1,257 @@
+package org.apache.activemq.broker.jmx;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.Message;
+
+public class OpenTypeSupport {
+
+    interface OpenTypeFactory {
+        CompositeType getCompositeType() throws OpenDataException;
+        Map getFields( Object o ) throws OpenDataException;
+    }
+    
+    private static final HashMap openTypeFactories = new HashMap();
+  
+    abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
+        
+        private CompositeType compositeType;
+        ArrayList itemNamesList = new ArrayList();            
+        ArrayList itemDescriptionsList = new ArrayList();            
+        ArrayList itemTypesList = new ArrayList();            
+        
+        public CompositeType getCompositeType() throws OpenDataException {
+            if( compositeType == null ) {
+                init();
+                compositeType = createCompositeType();
+            }
+            return compositeType;
+        }
+        
+        protected void init() throws OpenDataException {
+        }
+
+        protected CompositeType createCompositeType() throws OpenDataException {
+            String[] itemNames = (String[]) itemNamesList.toArray(new String[itemNamesList.size()]);
+            String[] itemDescriptions = (String[]) itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]);
+            OpenType[] itemTypes = (OpenType[]) itemTypesList.toArray(new OpenType[itemTypesList.size()]);
           
+            return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions,
itemTypes);        
+        }
+        
+        abstract protected String getTypeName();
+
+        protected void addItem(String name, String description, OpenType type) {
+            itemNamesList.add(name);
+            itemDescriptionsList.add(description);
+            itemTypesList.add(type);
+        }
+
+
+        protected String getDescription() {
+            return getTypeName();
+        }
+
+        public Map getFields(Object o) throws OpenDataException {
+            HashMap rc = new HashMap(); 
+            return rc;
+        }
+    }
+
+    static class MessageOpenTypeFactory extends AbstractOpenTypeFactory {
+        
+        protected String getTypeName() {
+            return ActiveMQMessage.class.getName();
+        }
+
+        protected void init() throws OpenDataException {
+            super.init();
+            addItem("JMSCorrelationID", "JMSCorrelationID", SimpleType.STRING);
+            addItem("JMSDestination", "JMSDestination", SimpleType.STRING);
+            addItem("JMSMessageID", "JMSMessageID", SimpleType.STRING);
+            addItem("JMSReplyTo", "JMSReplyTo", SimpleType.STRING);
+            addItem("JMSType", "JMSType", SimpleType.STRING);
+            addItem("JMSDeliveryMode", "JMSDeliveryMode", SimpleType.STRING);
+            addItem("JMSExpiration", "JMSExpiration", SimpleType.LONG);
+            addItem("JMSPriority", "JMSPriority", SimpleType.INTEGER);
+            addItem("JMSRedelivered", "JMSRedelivered", SimpleType.BOOLEAN);
+            addItem("JMSTimestamp", "JMSTimestamp", SimpleType.DATE);
+            addItem("Properties", "Properties", SimpleType.STRING);
+        }
+
+        public Map getFields(Object o) throws OpenDataException {
+            ActiveMQMessage m = (ActiveMQMessage) o;
+            Map rc = super.getFields(o);
+            rc.put("JMSCorrelationID", m.getJMSCorrelationID());
+            rc.put("JMSDestination", ""+m.getJMSDestination());
+            rc.put("JMSMessageID", m.getJMSMessageID());
+            rc.put("JMSReplyTo", ""+m.getJMSReplyTo());
+            rc.put("JMSType", m.getJMSType());
+            rc.put("JMSDeliveryMode", m.getJMSDeliveryMode()==DeliveryMode.PERSISTENT ? "PERSISTENT"
: "NON-PERSISTENT");
+            rc.put("JMSExpiration", new Long(m.getJMSExpiration()));
+            rc.put("JMSPriority", new Integer(m.getJMSPriority()));
+            rc.put("JMSRedelivered", new Boolean(m.getJMSRedelivered()));
+            rc.put("JMSTimestamp", new Date(m.getJMSTimestamp()));
+            try {
+                rc.put("Properties", ""+m.getProperties());
+            } catch (IOException e) {
+                rc.put("Properties", "");
+            }
+            return rc;
+        }
+    }
+
+    static class ByteMessageOpenTypeFactory extends MessageOpenTypeFactory {
+        
+        protected String getTypeName() {
+            return ActiveMQBytesMessage.class.getName();
+        }
+
+        protected void init() throws OpenDataException {
+            super.init();
+            addItem("BodyLength", "Body length", SimpleType.LONG);
+            addItem("BodyPreview", "Body preview", new ArrayType(1,SimpleType.BYTE));
+        }
+
+        public Map getFields(Object o) throws OpenDataException {
+            ActiveMQBytesMessage m = (ActiveMQBytesMessage) o;
+            Map rc = super.getFields(o);
+            long length=0;
+            try {
+                length = m.getBodyLength();
+                rc.put("BodyLength", new Long(length));
+            } catch (JMSException e) {
+                rc.put("BodyLength", new Long(0));
+            }
+            try {
+                byte preview[] = new byte[ (int)Math.min(length, 255) ];
+                m.readBytes(preview);
+                rc.put("BodyPreview", preview);
+            } catch (JMSException e) {
+                rc.put("BodyPreview", new byte[]{});
+            }
+            return rc;
+        }
+        
+    }
+    
+    static class MapMessageOpenTypeFactory extends MessageOpenTypeFactory {
+        protected String getTypeName() {
+            return ActiveMQMapMessage.class.getName();
+        }
+
+        protected void init() throws OpenDataException {
+            super.init();
+            addItem("ContentMap", "Content map", SimpleType.STRING);
+        }
+
+        public Map getFields(Object o) throws OpenDataException {
+            ActiveMQMapMessage m = (ActiveMQMapMessage) o;
+            Map rc = super.getFields(o);
+            long length=0;
+            try {
+                rc.put("ContentMap", ""+m.getContentMap());
+            } catch (JMSException e) {
+                rc.put("ContentMap", "");
+            }
+            return rc;
+        }
+    }
+    
+    static class ObjectMessageOpenTypeFactory extends MessageOpenTypeFactory {
+        protected String getTypeName() {
+            return ActiveMQObjectMessage.class.getName();
+        }
+
+        protected void init() throws OpenDataException {
+            super.init();
+        }
+
+        public Map getFields(Object o) throws OpenDataException {
+            ActiveMQObjectMessage m = (ActiveMQObjectMessage) o;
+            Map rc = super.getFields(o);
+            return rc;
+        }
+    }
+    
+    static class StreamMessageOpenTypeFactory extends MessageOpenTypeFactory {
+        protected String getTypeName() {
+            return ActiveMQStreamMessage.class.getName();
+        }
+
+        protected void init() throws OpenDataException {
+            super.init();
+        }
+
+        public Map getFields(Object o) throws OpenDataException {
+            ActiveMQStreamMessage m = (ActiveMQStreamMessage) o;
+            Map rc = super.getFields(o);
+            return rc;
+        }
+    }
+    
+    static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory {
+        protected String getTypeName() {
+            return ActiveMQTextMessage.class.getName();
+        }
+
+        protected void init() throws OpenDataException {
+            super.init();
+            addItem("Text", "Text", SimpleType.STRING);
+        }
+
+        public Map getFields(Object o) throws OpenDataException {
+            ActiveMQTextMessage m = (ActiveMQTextMessage) o;
+            Map rc = super.getFields(o);
+            try {
+                rc.put("Text", ""+m.getText());
+            } catch (JMSException e) {
+                rc.put("Text", "");
+            }
+            return rc;
+        }
+    }
+    
+    static {
+        openTypeFactories.put(ActiveMQMessage.class, new MessageOpenTypeFactory());
+        openTypeFactories.put(ActiveMQBytesMessage.class, new ByteMessageOpenTypeFactory());
+        openTypeFactories.put(ActiveMQMapMessage.class, new MapMessageOpenTypeFactory());
+        openTypeFactories.put(ActiveMQObjectMessage.class, new ObjectMessageOpenTypeFactory());
+        openTypeFactories.put(ActiveMQStreamMessage.class, new StreamMessageOpenTypeFactory());
+        openTypeFactories.put(ActiveMQTextMessage.class, new TextMessageOpenTypeFactory());
+    }
+    
+    public static OpenTypeFactory getFactory(Class clazz) throws OpenDataException {
+        return (OpenTypeFactory) openTypeFactories.get(clazz);
+    }
+    
+    public static CompositeData convert(Message message) throws OpenDataException {
+        OpenTypeFactory f = getFactory(message.getClass());
+        if( f == null )
+            throw new OpenDataException("Cannot create a CompositeData for type: "+message.getClass().getName());
+        CompositeType ct = f.getCompositeType();
+        Map fields = f.getFields(message);
+        return new CompositeDataSupport(ct, fields);
+    }
+
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java?rev=375654&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
Tue Feb  7 10:34:21 2006
@@ -0,0 +1,113 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
+import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.Message;
+
+public class QueueView implements QueueViewMBean {
+
+    private final Queue destination;
+
+    public QueueView(Queue destination) {
+        this.destination = destination;
+    }
+
+    public void gc() {
+        destination.gc();
+    }
+    public void resetStatistics() {
+        destination.getDestinationStatistics().reset();
+    }
+
+    public long getEnqueueCount() {
+        return destination.getDestinationStatistics().getEnqueues().getCount();
+    
+    }
+    public long getDequeueCount() {
+        return destination.getDestinationStatistics().getDequeues().getCount();
+    }
+
+    public long getConsumerCount() {
+        return destination.getDestinationStatistics().getConsumers().getCount();
+    }
+    
+    public long getMessages() {
+        return destination.getDestinationStatistics().getMessages().getCount();
+    }
+    
+    public long getMessagesCached() {
+        return destination.getDestinationStatistics().getMessagesCached().getCount();
+    }
+    
+    public CompositeData[] browse() throws OpenDataException {
+        Message[] messages = destination.browse();
+        CompositeData c[] = new CompositeData[messages.length];
+        for (int i = 0; i < c.length; i++) {
+            try {
+                System.out.println(messages[i].getMessageId());
+                c[i] = OpenTypeSupport.convert(messages[i]);
+            } catch (Throwable e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+        return c;
+    }
+    
+    public TabularData browseAsTable() throws OpenDataException {
+        OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
+        
+        Message[] messages = destination.browse();
+        CompositeType ct = factory.getCompositeType();
+        TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[]{"JMSMessageID"});
+        TabularDataSupport rc = new TabularDataSupport(tt);
+        for (int i = 0; i < messages.length; i++) {
+            System.out.println(messages[i].getMessageId());
+            rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
+        }
+        
+        return rc;
+    }
+
+    
+    public CompositeData getMessage(String messageId) throws OpenDataException {
+        Message rc = destination.getMessage(messageId);
+        if( rc ==null )
+            return null;
+        return OpenTypeSupport.convert(rc);
+    }
+    
+    public void removeMessage(String messageId) {
+        destination.removeMessage(messageId);
+    }
+
+    public void purge() {
+        destination.purge();
+    }
+    
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java?rev=375654&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
Tue Feb  7 10:34:21 2006
@@ -0,0 +1,42 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+
+
+public interface QueueViewMBean {
+    
+    public void gc();
+    public void resetStatistics();
+
+    public long getEnqueueCount();
+    public long getDequeueCount();
+    public long getConsumerCount();
+    public long getMessages();
+    public long getMessagesCached();
+
+    
+    public CompositeData[] browse() throws OpenDataException;
+    public TabularData browseAsTable() throws OpenDataException;
+    public CompositeData getMessage(String messageId) throws OpenDataException;
+    public void removeMessage(String messageId);
+    public void purge();
+
+}
\ No newline at end of file

Copied: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java
(from r375288, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java)
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java?p2=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java&p1=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java&r1=375288&r2=375654&rev=375654&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java
Tue Feb  7 10:34:21 2006
@@ -16,13 +16,13 @@
  */
 package org.apache.activemq.broker.jmx;
 
-import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Topic;
 
-public class DestinationView implements DestinationViewMBean {
+public class TopicView implements TopicViewMBean {
 
-    private final Destination destination;
+    private final Topic destination;
 
-    public DestinationView(Destination destination) {
+    public TopicView(Topic destination) {
         this.destination = destination;
     }
 

Copied: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java
(from r375288, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java)
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java?p2=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java&p1=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java&r1=375288&r2=375654&rev=375654&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java
Tue Feb  7 10:34:21 2006
@@ -17,7 +17,7 @@
 package org.apache.activemq.broker.jmx;
 
 
-public interface DestinationViewMBean {
+public interface TopicViewMBean {
     
     public void gc();
     public void resetStatistics();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Tue Feb  7 10:34:21 2006
@@ -170,6 +170,11 @@
             Destination dest = (Destination) iter.next();
             dest.removeSubscription(context, sub);
         }
+        
+        destroySubscription(sub);
+    }
+
+    protected void destroySubscription(Subscription sub) {        
     }
 
     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info)
throws Throwable {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Feb  7 10:34:21 2006
@@ -16,7 +16,11 @@
  */
 package org.apache.activemq.broker.region;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.group.MessageGroupHashBucket;
@@ -41,10 +45,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * The Queue is a List of MessageEntry objects that are dispatched to matching
@@ -139,7 +140,7 @@
                 for (Iterator iter = messages.iterator(); iter.hasNext();) {
 
                     IndirectMessageReference node = (IndirectMessageReference) iter.next();
-                    if (node.isDropped() ) {
+                    if (node.isDropped()) {
                         continue;
                     }
 
@@ -148,15 +149,13 @@
                         if (sub.matches(node, msgContext)) {
                             sub.add(node);
                         }
-                    }
-                    catch (IOException e) {
+                    } catch (IOException e) {
                         log.warn("Could not load message: " + e, e);
                     }
                 }
             }
 
-        }
-        finally {
+        } finally {
             msgContext.clear();
             dispatchValve.turnOn();
         }
@@ -193,17 +192,18 @@
                     MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
                     try {
                         msgContext.setDestination(destination);
-                        
+
                         for (Iterator iter = messages.iterator(); iter.hasNext();) {
-                            IndirectMessageReference node = (IndirectMessageReference) iter.next();
                           
-                            if (node.isDropped() ) {
+                            IndirectMessageReference node = (IndirectMessageReference) iter.next();
+                            if (node.isDropped()) {
                                 continue;
                             }
-    
+
                             String groupID = node.getGroupID();
-    
+
                             // Re-deliver all messages that the sub locked
-                            if (node.getLockOwner() == sub || wasExclusiveOwner || (groupID
!= null && ownedGroups.contains(groupID))) {
+                            if (node.getLockOwner() == sub || wasExclusiveOwner
+                                    || (groupID != null && ownedGroups.contains(groupID)))
{
                                 node.incrementRedeliveryCounter();
                                 node.unlock();
                                 msgContext.setMessageReference(node);
@@ -216,8 +216,7 @@
                 }
             }
 
-        }
-        finally {
+        } finally {
             dispatchValve.turnOn();
         }
 
@@ -225,9 +224,9 @@
 
     public void send(final ConnectionContext context, final Message message) throws Throwable
{
 
-        if( context.isProducerFlowControl() )
+        if (context.isProducerFlowControl())
             usageManager.waitForSpace();
-        
+
         message.setRegionDestination(this);
 
         if (store != null && message.isPersistent())
@@ -242,8 +241,7 @@
                         dispatch(context, node, message);
                     }
                 });
-            }
-            else {
+            } else {
                 dispatch(context, node, message);
             }
         } finally {
@@ -274,7 +272,7 @@
             for (Iterator iter = messages.iterator(); iter.hasNext();) {
                 // Remove dropped messages from the queue.
                 IndirectMessageReference node = (IndirectMessageReference) iter.next();
-                if (node.isDropped()) {                    
+                if (node.isDropped()) {
                     garbageSize--;
                     iter.remove();
                     continue;
@@ -283,7 +281,8 @@
         }
     }
 
-    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck
ack, final MessageReference node) throws IOException {
+    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck
ack,
+            final MessageReference node) throws IOException {
         if (store != null && node.isPersistent()) {
             store.removeMessage(context, ack);
         }
@@ -291,15 +290,16 @@
 
     public Message loadMessage(MessageId messageId) throws IOException {
         Message msg = store.getMessage(messageId);
-        if( msg!=null ) {
+        if (msg != null) {
             msg.setRegionDestination(this);
         }
         return msg;
     }
 
     public String toString() {
-        return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions="
+ consumers.size() + ", memory=" + usageManager.getPercentUsage()
-                + "%, size=" + messages.size() + ", in flight groups=" + messageGroupOwners;
+        return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions="
+ consumers.size()
+                + ", memory=" + usageManager.getPercentUsage() + "%, size=" + messages.size()
+ ", in flight groups="
+                + messageGroupOwners;
     }
 
     public void start() throws Exception {
@@ -324,7 +324,7 @@
 
     public MessageGroupMap getMessageGroupOwners() {
         if (messageGroupOwners == null) {
-            messageGroupOwners = new MessageGroupHashBucket(messageGroupHashBucketCount );
+            messageGroupOwners = new MessageGroupHashBucket(messageGroupHashBucketCount);
         }
         return messageGroupOwners;
     }
@@ -352,7 +352,6 @@
     public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) {
         this.messageGroupHashBucketCount = messageGroupHashBucketCount;
     }
-    
 
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -370,7 +369,7 @@
                 messages.add(node);
             }
 
-            synchronized(consumers) {
+            synchronized (consumers) {
                 if (consumers.isEmpty()) {
                     log.debug("No subscriptions registered, will not dispatch message at
this time.");
                     return;
@@ -381,8 +380,7 @@
             msgContext.setMessageReference(node);
 
             dispatchPolicy.dispatch(context, node, msgContext, consumers);
-        }
-        finally {
+        } finally {
             msgContext.clear();
             dispatchValve.decrement();
         }
@@ -404,5 +402,94 @@
     public MessageStore getMessageStore() {
         return store;
     }
+
+    public Message[] browse() {
+
+        ArrayList l = new ArrayList();
+        synchronized (messages) {
+            for (Iterator iter = messages.iterator(); iter.hasNext();) {
+                try {
+                    MessageReference r = (MessageReference) iter.next();
+                    try {
+                        Message m = r.getMessage();
+                        if (m != null) {
+                            l.add(m);
+                        }
+                    } finally {
+                        r.decrementReferenceCount();
+                    }
+                } catch (IOException e) {
+                }
+            }
+        }
+
+        return (Message[]) l.toArray(new Message[l.size()]);
+    }
+
+    public void removeMessage(String messageId) {
+        synchronized (messages) {
+            ConnectionContext c = new ConnectionContext();
+            for (Iterator iter = messages.iterator(); iter.hasNext();) {
+                try {
+                    IndirectMessageReference r = (IndirectMessageReference) iter.next();
+                    if (messageId.equals(r.getMessageId().toString())) {
+                        MessageAck ack = new MessageAck();
+                        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+                        ack.setDestination(destination);
+                        ack.setMessageID(r.getMessageId());
+                        acknowledge(c, null, ack, r);
+                        r.drop();
+                        dropEvent();
+                    }
+                } catch (IOException e) {
+                }
+            }
+        }
+    }
+
+    public Message getMessage(String messageId) {
+        synchronized (messages) {
+            for (Iterator iter = messages.iterator(); iter.hasNext();) {
+                try {
+                    MessageReference r = (MessageReference) iter.next();
+                    if (messageId.equals(r.getMessageId().toString())) {
+                        r.incrementReferenceCount();
+                        try {
+                            Message m = r.getMessage();
+                            if (m != null) {
+                                return m;
+                            }
+                        } finally {
+                            r.decrementReferenceCount();
+                        }
+                        break;
+                    }
+                } catch (IOException e) {
+                }
+            }
+        }
+        return null;
+    }
+
+    public void purge() {
+        synchronized (messages) {
+            ConnectionContext c = new ConnectionContext();
+            for (Iterator iter = messages.iterator(); iter.hasNext();) {
+                try {
+                    IndirectMessageReference r = (IndirectMessageReference) iter.next();
+                    MessageAck ack = new MessageAck();
+                    ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+                    ack.setDestination(destination);
+                    ack.setMessageID(r.getMessageId());
+                    acknowledge(c, null, ack, r);
+                    r.drop();
+                    dropEvent();
+                } catch (IOException e) {
+                }
+            }
+        }
+    }
+
+
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
Tue Feb  7 10:34:21 2006
@@ -24,6 +24,7 @@
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
 
@@ -660,5 +661,10 @@
         return super.toString() + " ActiveMQMapMessage{ " +
                 "theTable = " + map +
                 " }";
+    }
+    
+    public Map getContentMap() throws JMSException {
+        initializeReading();
+        return map;
     }
 }



Mime
View raw message