activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r383815 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/ broker/jmx/ broker/region/ command/ util/
Date Tue, 07 Mar 2006 07:39:54 GMT
Author: chirino
Date: Mon Mar  6 23:39:51 2006
New Revision: 383815

URL: http://svn.apache.org/viewcvs?rev=383815&view=rev
Log:
- A bunch of JMX related tiding up.
  - Added a sendTextMessage() method to the DestinationView so that a JMX console can send a test message to a destination.
  - Renamed a few bean properties so that they would be more consistent:
      - use get*Counter for properties that keep incrementing
      - use get*QueueSize for poperties that repesenet the size of list in broker.
  - the gc() method on the broker is now the only one expose
  - the CachedMessageCount is not exposed anymore.


Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Mar  6 23:39:51 2006
@@ -22,7 +22,6 @@
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -754,12 +753,13 @@
     protected void registerConnectorMBean(TransportConnector connector) throws IOException, URISyntaxException {
         MBeanServer mbeanServer = getManagementContext().getMBeanServer();
         ConnectorViewMBean view = new ConnectorView(connector);
-        Hashtable map = new Hashtable();
-        map.put("Type", "Connector");
-        map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName()));
-        map.put("ConnectorName", JMXSupport.encodeObjectNamePart(connector.getName()));
         try {
-            ObjectName objectName = new ObjectName("org.apache.activemq", map);
+        	ObjectName objectName = new ObjectName(
+            		managementContext.getJmxDomainName()+":"+
+            		"BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+
+            		"Type=Connector,"+
+            		"ConnectorName="+JMXSupport.encodeObjectNamePart(connector.getName())
+            		);
             mbeanServer.registerMBean(view, objectName);
             registeredMBeanNames.add(objectName);
         }
@@ -771,13 +771,12 @@
     protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
         MBeanServer mbeanServer = getManagementContext().getMBeanServer();
         NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
-        Hashtable map = new Hashtable();
-        map.put("Type", "NetworkConnector");
-        map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName()));
-        // map.put("ConnectorName",
-        // JMXSupport.encodeObjectNamePart(connector.()));
         try {
-            ObjectName objectName = new ObjectName("org.apache.activemq", map);
+        	ObjectName objectName = new ObjectName(
+            		managementContext.getJmxDomainName()+":"+
+            		"BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+
+            		"Type=NetworkConnector"
+            		);
             mbeanServer.registerMBean(view, objectName);
             registeredMBeanNames.add(objectName);
         }
@@ -789,13 +788,12 @@
     protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
         MBeanServer mbeanServer = getManagementContext().getMBeanServer();
         ProxyConnectorView view = new ProxyConnectorView(connector);
-        Hashtable map = new Hashtable();
-        map.put("Type", "ProxyConnector");
-        map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName()));
-        // map.put("ConnectorName",
-        // JMXSupport.encodeObjectNamePart(connector.()));
         try {
-            ObjectName objectName = new ObjectName("org.apache.activemq", map);
+        	ObjectName objectName = new ObjectName(
+            		managementContext.getJmxDomainName()+":"+
+            		"BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+
+            		"Type=ProxyConnector"
+            		);
             mbeanServer.registerMBean(view, objectName);
             registeredMBeanNames.add(objectName);
         }
@@ -807,13 +805,12 @@
     protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
         MBeanServer mbeanServer = getManagementContext().getMBeanServer();
         FTConnectorView view = new FTConnectorView(connector);
-        Hashtable map = new Hashtable();
-        map.put("Type", "MasterConnector");
-        map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName()));
-        // map.put("ConnectorName",
-        // JMXSupport.encodeObjectNamePart(connector.()));
         try {
-            ObjectName objectName = new ObjectName("org.apache.activemq", map);
+        	ObjectName objectName = new ObjectName(
+            		managementContext.getJmxDomainName()+":"+
+            		"BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+
+            		"Type=MasterConnector"
+            		);
             mbeanServer.registerMBean(view, objectName);
             registeredMBeanNames.add(objectName);
         }
@@ -825,13 +822,12 @@
     protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
         MBeanServer mbeanServer = getManagementContext().getMBeanServer();
         JmsConnectorView view = new JmsConnectorView(connector);
-        Hashtable map = new Hashtable();
-        map.put("Type", "JmsConnector");
-        map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName()));
-        // map.put("ConnectorName",
-        // JMXSupport.encodeObjectNamePart(connector.()));
         try {
-            ObjectName objectName = new ObjectName("org.apache.activemq", map);
+        	ObjectName objectName = new ObjectName(
+            		managementContext.getJmxDomainName()+":"+
+            		"BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+
+            		"Type=JmsConnector"
+            		);
             mbeanServer.registerMBean(view, objectName);
             registeredMBeanNames.add(objectName);
         }
@@ -867,7 +863,7 @@
         if (isUseJmx()) {
             ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
             managedBroker.setContextBroker(broker);
-            BrokerViewMBean view = new BrokerView(managedBroker, getMemoryManager());
+            BrokerViewMBean view = new BrokerView(this, managedBroker);
             MBeanServer mbeanServer = getManagementContext().getMBeanServer();
             ObjectName objectName = getBrokerObjectName();
             mbeanServer.registerMBean(view, objectName);
@@ -942,10 +938,11 @@
 
     protected ObjectName createBrokerObjectName() throws IOException {
         try {
-            Hashtable map = new Hashtable();
-            map.put("Type", "Broker");
-            map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName()));
-            return new ObjectName(getManagementContext().getJmxDomainName(), map);
+            return new ObjectName(
+            		getManagementContext().getJmxDomainName()+":"+
+            		"BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+
+            		"Type=Broker"
+            		);
         }
         catch (Throwable e) {
             throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Mon Mar  6 23:39:51 2006
@@ -19,35 +19,35 @@
 import javax.management.ObjectName;
 
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.memory.UsageManager;
 
 public class BrokerView implements BrokerViewMBean {
     
     private final ManagedRegionBroker broker;
-    private final UsageManager usageManager;
+	private final BrokerService brokerService;
 
-    public BrokerView(ManagedRegionBroker broker, UsageManager usageManager) {
-        this.broker = broker;
-        this.usageManager = usageManager;        
+    public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception {
+        this.brokerService = brokerService;
+		this.broker = managedBroker;
     }
     
     public String getBrokerId() {
         return broker.getBrokerId().toString();
     }
     
-    public void gc() {
-        broker.gc();
+    public void gc() throws Exception {
+    	brokerService.getBroker().gc();
     }
 
     public void start() throws Exception {
-        broker.start();
+    	brokerService.start();
     }
     
     public void stop() throws Exception {
-        broker.stop();
+    	brokerService.stop();
     }
     
     public long getTotalEnqueueCount() {
@@ -59,7 +59,7 @@
     public long getTotalConsumerCount() {
         return broker.getDestinationStatistics().getConsumers().getCount();
     }
-    public long getTotalMessages() {
+    public long getTotalMessageCount() {
         return broker.getDestinationStatistics().getMessages().getCount();
     }    
     public long getTotalMessagesCached() {
@@ -67,13 +67,13 @@
     }
 
     public int getMemoryPercentageUsed() {
-        return usageManager.getPercentUsage();
+        return brokerService.getMemoryManager().getPercentUsage();
     }
     public long getMemoryLimit() {
-        return usageManager.getLimit();
+        return brokerService.getMemoryManager().getLimit();
     }
     public void setMemoryLimit(long limit) {
-        usageManager.setLimit(limit);
+    	brokerService.getMemoryManager().setLimit(limit);
     }
     
     public void resetStatistics() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Mon Mar  6 23:39:51 2006
@@ -21,23 +21,42 @@
 
 public interface BrokerViewMBean extends Service {
 
+	/**
+	 * @return The unique id of the broker.
+	 */
     public abstract String getBrokerId();
 
-    public abstract void gc();
+    /**
+     * The Broker will fush it's caches so that the garbage
+     * collector can recalaim more memory.
+     * 
+     * @throws Exception
+     */
+    public void gc() throws Exception;
+    
+    
     public void resetStatistics();
     
     public long getTotalEnqueueCount();
     public long getTotalDequeueCount();
     public long getTotalConsumerCount();
-    public long getTotalMessages();
-    public long getTotalMessagesCached();
-
+    public long getTotalMessageCount();
+    
     public int getMemoryPercentageUsed();
     public long getMemoryLimit();
     public void setMemoryLimit(long limit);
 
+    /**
+     * Shuts down the JVM.
+     * @param exitCode the exit code that will be reported by the JVM process when it exits.
+     */
     public void terminateJVM(int exitCode);
     
+    /**
+     * Stop the broker and all it's comonents.
+     */
+    public void stop() throws Exception;
+    
     public ObjectName[] getTopics();
     public ObjectName[] getQueues();
     public ObjectName[] getTemporaryTopics();
@@ -50,9 +69,32 @@
     public ObjectName[] getTemporaryTopicSubscribers();
     public ObjectName[] getTemporaryQueueSubscribers();
     
+    /** 
+     * Adds a Topic destination to the broker.
+     * @param name The name of the Topic
+     * @throws Exception
+     */
     public void addTopic(String name) throws Exception;
+
+    /**
+     * Adds a Queue destination to the broker.
+     * @param name The name of the Queue
+     * @throws Exception
+     */
     public void addQueue(String name) throws Exception;
+
+    /** 
+     * Removes a Topic destination from the broker.
+     * @param name The name of the Topic
+     * @throws Exception
+     */
     public void removeTopic(String name) throws Exception;
+
+    /**
+     * Removes a Queue destination from the broker.
+     * @param name The name of the Queue
+     * @throws Exception
+     */
     public void removeQueue(String name) throws Exception;
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Mon Mar  6 23:39:51 2006
@@ -13,6 +13,13 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
 import javax.management.openmbean.CompositeType;
@@ -21,9 +28,12 @@
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.Message;
 
 public class DestinationView {
@@ -55,7 +65,7 @@
         return destination.getDestinationStatistics().getConsumers().getCount();
     }
 
-    public long getMessages(){
+    public long getQueueSize(){
         return destination.getDestinationStatistics().getMessages().getCount();
     }
 
@@ -87,4 +97,42 @@
         }
         return rc;
     }
+    
+    public String sendTextMessage(String body) throws Exception {
+    	return sendTextMessage(Collections.EMPTY_MAP, body);
+    }
+    
+    public String sendTextMessage(Map headers, String body) throws Exception {
+    	
+    	String brokerUrl = "vm://"+broker.getBrokerName();
+    	ActiveMQDestination dest = destination.getActiveMQDestination();
+    	
+    	ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
+    	Connection connection = null;
+    	try {
+    		
+    		connection = cf.createConnection();
+			Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+			MessageProducer producer = session.createProducer(dest);
+			ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body);
+			
+			for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
+				Map.Entry entry = (Map.Entry) iter.next();
+				msg.setObjectProperty((String) entry.getKey(), entry.getValue());
+			}
+			
+			producer.setDeliveryMode(msg.getJMSDeliveryMode());
+			producer.setPriority(msg.getPriority());
+			long ttl = msg.getExpiration() - System.currentTimeMillis();
+			producer.setTimeToLive(ttl > 0 ? ttl : 0);
+	    	producer.send(msg);
+	    	
+	    	return msg.getJMSMessageID();
+	    	
+    	} finally {
+    		connection.close();
+    	}
+    	
+    }
+
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=383815&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Mon Mar  6 23:39:51 2006
@@ -0,0 +1,79 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import java.util.Map;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+
+
+public interface DestinationViewMBean {
+    
+	/**
+	 * Resets the managment counters.
+	 */
+    public void resetStatistics();
+
+    /**
+     * @return The number of messages that have been sent to the destination.
+     */
+    public long getEnqueueCount();
+
+    /**
+     * @return The number of messages that have been received from the destination.
+     */
+    public long getDequeueCount();
+
+    /**
+     * @return The number of consmers subscribed to messages from this destination.
+     */
+    public long getConsumerCount();
+
+    /**
+     * @return The number of messages being buffered by this destination
+     */
+    public long getQueueSize();
+    
+    /**
+     * @return An array of all the messages in the destination's queue.
+     */
+    public CompositeData[] browse() throws OpenDataException;
+    
+    /**
+     * @return A list of all the messages in the destination's queue.
+     */
+    public TabularData browseAsTable() throws OpenDataException;
+
+    /**
+     * Sends a TextMesage to the destination.
+     * @param body the text to send
+     * @return the message id of the message sent.
+     * @throws Exception
+     */
+    public String sendTextMessage(String body) throws Exception;
+
+    /**
+     * Sends a TextMesage to the destination.
+     * @param headers the message headers and properties to set.  Can only container Strings maped to primitive types.
+     * @param body the text to send
+     * @return the message id of the message sent.
+     * @throws Exception
+     */
+    public String sendTextMessage(Map headers, String body) throws Exception;
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java Mon Mar  6 23:39:51 2006
@@ -21,7 +21,7 @@
  */
 public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean{
     /**
-     * @return name of the durable consumer
+     * @return name of the durable subscription name
      */
     public String getSubscriptionName();
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Mon Mar  6 23:39:51 2006
@@ -44,7 +44,6 @@
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -116,18 +115,21 @@
 
     public void register(ActiveMQDestination destName,Destination destination){
         // Build the object name for the destination
-        Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
-        map.put("Type",JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()));
-        map.put("Destination",JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
+        Hashtable map=brokerObjectName.getKeyPropertyList();
         try{
-            ObjectName destObjectName=new ObjectName(brokerObjectName.getDomain(),map);
+        	ObjectName objectName = new ObjectName(
+        			brokerObjectName.getDomain()+":"+
+            		"BrokerName="+map.get("BrokerName")+","+
+            		"Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+
+                    "Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName())
+            		);
             DestinationView view;
             if(destination instanceof Queue){
                 view=new QueueView(this, (Queue) destination);
             }else{
                 view=new TopicView(this, (Topic) destination);
             }
-            registerDestination(destObjectName,destName,view);
+            registerDestination(objectName,destName,view);
         }catch(Exception e){
             log.error("Failed to register destination "+destName,e);
         }
@@ -136,11 +138,14 @@
     public void unregister(ActiveMQDestination destName){
         // Build the object name for the destination
         Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
-        map.put("Type",JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()));
-        map.put("Destination",JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
         try{
-            ObjectName destObjectName=new ObjectName(brokerObjectName.getDomain(),map);
-            unregisterDestination(destObjectName);
+        	ObjectName objectName = new ObjectName(
+        			brokerObjectName.getDomain()+":"+
+            		"BrokerName="+map.get("BrokerName")+","+
+            		"Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+
+                    "Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName())
+            		);
+            unregisterDestination(objectName);
         }catch(Exception e){
             log.error("Failed to unregister "+destName,e);
         }
@@ -148,13 +153,18 @@
 
     public void registerSubscription(ConnectionContext context,Subscription sub){
         SubscriptionKey key = new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName());
-        Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
-        map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
+        Hashtable map=brokerObjectName.getKeyPropertyList();
         String name = key.toString() + ":" + sub.getConsumerInfo().toString();
-        map.put("name",JMXSupport.encodeObjectNamePart(name));
-        map.put("active", "true");
         try{
-            ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
+        	
+        	ObjectName objectName = new ObjectName(
+        			brokerObjectName.getDomain()+":"+
+            		"BrokerName="+map.get("BrokerName")+","+
+            		"Type=Subscription,"+
+            		"active=true,"+
+                    "name="+JMXSupport.encodeObjectNamePart(name)+""
+            		);
+
             SubscriptionView view;
             if(sub.getConsumerInfo().isDurable()){
                 view=new DurableSubscriptionView(this,context.getClientId(),sub);
@@ -292,12 +302,16 @@
     }
     
     protected void addInactiveSubscription(SubscriptionKey key,SubscriptionInfo info){
-        Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
-        map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
-        map.put("name",JMXSupport.encodeObjectNamePart(key.toString()));
-        map.put("active", "false");
+        Hashtable map=brokerObjectName.getKeyPropertyList();
         try{
-            ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
+        	ObjectName objectName = new ObjectName(
+        			brokerObjectName.getDomain()+":"+
+            		"BrokerName="+map.get("BrokerName")+","+
+            		"Type=Subscription,"+
+            		"active=false,"+
+                    "name="+JMXSupport.encodeObjectNamePart(key.toString())+""
+            		);
+
             SubscriptionView view = new InactiveDurableSubscriptionView(this,key.getClientId(),info);
             mbeanServer.registerMBean(view,objectName);
             inactiveDurableTopicSubscribers.put(objectName,view);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java Mon Mar  6 23:39:51 2006
@@ -113,11 +113,14 @@
 
     protected ObjectName createObjectName() throws IOException {
         // Build the object name for the destination
-        Hashtable map = new Hashtable(connectorName.getKeyPropertyList());
-        map.put("Type", "Connection");
-        map.put("Connection", JMXSupport.encodeObjectNamePart(connectionId));
+        Hashtable map = connectorName.getKeyPropertyList();
         try {
-            return new ObjectName(connectorName.getDomain(), map);
+            return new ObjectName(
+            		connectorName.getDomain()+":"+
+            		"BrokerName="+map.get("BrokerName")+","+
+            		"Type=Connection,"+
+            		"Connection="+JMXSupport.encodeObjectNamePart(connectionId)
+            		);
         }
         catch (Throwable e) {
             throw IOExceptionSupport.create(e);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java Mon Mar  6 23:39:51 2006
@@ -15,9 +15,14 @@
 
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.OpenDataException;
+
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
+
+/**
+ * Provides a JMX Management view of a Queue.
+ */
 public class QueueView extends DestinationView implements QueueViewMBean{
     public QueueView(ManagedRegionBroker broker, Queue destination){
         super(broker, destination);
@@ -30,8 +35,8 @@
         return OpenTypeSupport.convert(rc);
     }
 
-    public void removeMessage(String messageId){
-        ((Queue) destination).removeMessage(messageId);
+    public boolean removeMessage(String messageId){
+        return ((Queue) destination).removeMessage(messageId);
     }
 
     public void purge(){
@@ -41,4 +46,5 @@
     public boolean copyMessageTo(String messageId, String destinationName) throws Exception {
         return ((Queue) destination).copyMessageTo(BrokerView.getConnectionContext(broker.getContextBroker()), messageId, ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE));
     }
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java Mon Mar  6 23:39:51 2006
@@ -18,26 +18,42 @@
 
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.TabularData;
 
 
-public interface QueueViewMBean {
+public interface QueueViewMBean extends DestinationViewMBean {
     
-    public void gc();
-    public void resetStatistics();
-
-    public long getEnqueueCount();
-    public long getDequeueCount();
-    public long getConsumerCount();
-    public long getMessages();
-    public long getMessagesCached();
-
-    
-    public CompositeData[] browse() throws OpenDataException;
-    public TabularData browseAsTable() throws OpenDataException;
+	/**
+	 * Retrieve a message from the destination's queue.
+	 * 
+	 * @param messageId the message id of the message to retreive
+	 * @return A CompositeData object which is a JMX version of the messages
+	 * @throws OpenDataException
+	 */
     public CompositeData getMessage(String messageId) throws OpenDataException;
-    public void removeMessage(String messageId);
+    
+    /**
+     * Removes a message from the queue.  If the message has allready been dispatched 
+     * to another consumer, the message cannot be delted and this method will return 
+     * false.
+     * 
+     * @param messageId 
+     * @return true if the message was found and could be succesfully deleted.
+     */
+    public boolean removeMessage(String messageId);
+    
+    /**
+     * Emptys out all the messages in the queue.
+     */
     public void purge();
-
+    
+    /**
+     * Copys a given message to another destination.
+     * 
+     * @param messageId
+     * @param destinationName
+     * @return true if the message was found and was successfuly copied to the other destination.
+     * @throws Exception
+     */
     public boolean copyMessageTo(String messageId, String destinationName) throws Exception;
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java Mon Mar  6 23:39:51 2006
@@ -150,24 +150,38 @@
     /**
      * @return number of messages pending delivery
      */
-    public int getPending(){
-        return subscription != null ? subscription.pending() : 0;
+    public int getPendingQueueSize(){
+        return subscription != null ? subscription.getPendingQueueSize() : 0;
     }
     
     /**
      * @return number of messages dispatched
      */
-    public int getDispatched(){
-        return subscription != null ? subscription.dispatched() : 0;
+    public int getDispatchedQueueSize(){
+        return subscription != null ? subscription.getDispatchedQueueSize() : 0;
     }
-    
+        
     /**
-     * @return number of messages delivered
+     * @return number of messages that matched the subscription
      */
-    public int getDelivered(){
-        return subscription != null ? subscription.delivered() : 0;
+    public long getDispachedCounter() {
+        return subscription != null ? subscription.getDispatchedCounter() : 0;
     }
-    
+
+    /**
+     * @return number of messages that matched the subscription
+     */
+    public long getEnqueueCounter() {
+        return subscription != null ? subscription.getEnqueueCounter() : 0;
+    }
+
+    /**
+     * @return number of messages queued by the client
+     */
+    public long getDequeueCounter() {
+        return subscription != null ? subscription.getDequeueCounter() : 0;
+    }
+
     protected ConsumerInfo getConsumerInfo(){
         return subscription != null ? subscription.getConsumerInfo() : null;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java Mon Mar  6 23:39:51 2006
@@ -19,7 +19,7 @@
 public interface SubscriptionViewMBean{
     
     /**
-     * @return the clientId
+     * @return the clientId of the Connection the Subscription is on
      */
     public String getClientId();
     /**
@@ -63,22 +63,28 @@
     public boolean isActive();
 
     /**
-     * The subscription should release as may references as it can to help the garbage collector reclaim memory.
+     * @return number of messages pending delivery
      */
-    public void gc();
+    public int getPendingQueueSize();
 
     /**
-     * @return number of messages pending delivery
+     * @return number of messages dispatched
+     */
+    public int getDispatchedQueueSize();
+    
+    /**
+     * @return number of messages that matched the subscription
      */
-    public int getPending();
+    long getDispachedCounter();
 
     /**
-     * @return number of messages dispatched
+     * @return number of messages that matched the subscription
      */
-    public int getDispatched();
+    long getEnqueueCounter();
 
     /**
-     * @return number of messages delivered
+     * @return number of messages queued by the client
      */
-    public int getDelivered();
+    long getDequeueCounter();
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java Mon Mar  6 23:39:51 2006
@@ -35,18 +35,26 @@
     /**
      * @return the number of messages discarded due to being a slow consumer
      */
-    public int getDiscarded() {
+    public int getDiscardedCount() {
         TopicSubscription topicSubscription = getTopicSubscription();
         return topicSubscription != null ? topicSubscription.discarded() : 0;
     }
 
     /**
-     * @return the number of matched messages (messages targeted for the
-     *         subscription but not yet able to be dispatched due to the
-     *         prefetch buffer being full).
+     * @return the maximun number of messages that can be pending.
      */
-    public int getMatched() {
+    public int getMaximumPendingQueueSize() {
         TopicSubscription topicSubscription = getTopicSubscription();
-        return topicSubscription != null ? topicSubscription.matched() : 0;
+        return topicSubscription != null ? topicSubscription.getMaximumPendingMessages() : 0;
+    }
+    
+    /**
+     * 
+     */
+    public void setMaximumPendingQueueSize(int max) {
+        TopicSubscription topicSubscription = getTopicSubscription();
+        if ( topicSubscription != null ) {
+        	topicSubscription.setMaximumPendingMessages(max);
+        }
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionViewMBean.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionViewMBean.java Mon Mar  6 23:39:51 2006
@@ -25,12 +25,12 @@
     /**
      * @return the number of messages discarded due to being a slow consumer
      */
-    public int getDiscarded();
-    
+    public int getDiscardedCount();
+
     /**
-     * @return the number of matched messages (messages targeted for the subscription but not
-     * yet able to be dispatched due to the prefetch buffer being full).
+     * @return the maximun number of messages that can be pending.
      */
-    public int getMatched();
-
+    public int getMaximumPendingQueueSize();
+    public void setMaximumPendingQueueSize(int max);
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java Mon Mar  6 23:39:51 2006
@@ -16,27 +16,24 @@
  */
 package org.apache.activemq.broker.jmx;
 
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.TabularData;
-
-
-public interface TopicViewMBean {
+public interface TopicViewMBean extends DestinationViewMBean {
     
-    public void gc();
-    public void resetStatistics();
+	/**
+	 * Creates a durable subscription that is subscribed to this topic.
+	 * 
+	 * @param clientId
+	 * @param subscriberName
+	 * @throws Exception
+	 */
+    public void createDurableSubscriber(String clientId, String subscriberName) throws Exception;
 
-    public long getEnqueueCount();
-    public long getDequeueCount();
-    public long getConsumerCount();
-    public long getMessages();
-    public long getMessagesCached();
-    
-    public CompositeData[] browse() throws OpenDataException;
-    public TabularData browseAsTable() throws OpenDataException;
-    
-    public void createDurableSubscriber(String clientId,String subscriberName) throws Exception;
-    
-    public void destroyDurableSubscriber(String clientId,String subscriberName) throws Exception;
+    /**
+	 * Destroys a durable subscription that had previously subscribed to this topic.
+	 * 
+	 * @param clientId
+	 * @param subscriberName
+	 * @throws Exception
+	 */
+    public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception;
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Mar  6 23:39:51 2006
@@ -123,12 +123,11 @@
         node.decrementReferenceCount();
     }
     
-    public int pending(){
+    public int getPendingQueueSize(){
         if (active){
-            return super.pending();
+            return super.getPendingQueueSize();
         }
         //TODO: need to get from store
-        
         return 0;
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Mar  6 23:39:51 2006
@@ -13,6 +13,13 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
@@ -27,11 +34,6 @@
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
 /**
  * A subscription that honors the pre-fetch option of the ConsumerInfo.
  * 
@@ -42,6 +44,7 @@
     static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
     final protected LinkedList matched=new LinkedList();
     final protected LinkedList dispatched=new LinkedList();
+    
     protected int delivered=0;
     int preLoadLimit=1024*100;
     int preLoadSize=0;
@@ -49,6 +52,7 @@
     
     long enqueueCounter;
     long dispatchCounter;
+    long aknowledgedCounter;
     
     public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
                     throws InvalidSelectorException{
@@ -102,12 +106,14 @@
                 if(inAckRange){
                     // Don't remove the nodes until we are committed.
                     if(!context.isInTransaction()){
+                    	aknowledgedCounter++;
                         iter.remove();
                     }else{
                         // setup a Synchronization to remove nodes from the dispatched list.
                         context.getTransaction().addSynchronization(new Synchronization(){
                             public void afterCommit() throws Exception{
                                 synchronized(PrefetchSubscription.this){
+                                	aknowledgedCounter++;
                                     dispatched.remove(node);
                                     delivered--;
                                 }
@@ -178,6 +184,7 @@
                         node.decrementReferenceCount();
                     }
                     iter.remove();
+                    aknowledgedCounter++;
                     index++;
                     acknowledge(context,ack,node);
                     if(ack.getLastMessageId().equals(messageId)){
@@ -198,18 +205,27 @@
         return dispatched.size()-delivered>=info.getPrefetchSize()||preLoadSize>preLoadLimit;
     }
     
-    public int pending(){
-        return matched.size() - dispatched.size();
+    synchronized public int getPendingQueueSize(){
+        return matched.size();
     }
     
-    public int dispatched(){
+    synchronized public int getDispatchedQueueSize(){
         return dispatched.size();
     }
     
-    public int delivered(){
-        return delivered;
+    synchronized public long getDequeueCounter(){
+        return aknowledgedCounter;
+    }
+    
+    synchronized public long getDispatchedCounter() {
+        return dispatchCounter;
+    }
+    
+    synchronized public long getEnqueueCounter() {
+        return enqueueCounter;
     }
 
+
     protected void dispatchMatched() throws IOException{
         if(!dispatching){
             dispatching=true;
@@ -318,11 +334,4 @@
                     throws IOException{}
 
 
-    public long getDispatchCounter() {
-        return dispatchCounter;
-    }
-
-    public long getEnqueueCounter() {
-        return enqueueCounter;
-    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Mar  6 23:39:51 2006
@@ -437,7 +437,7 @@
         return (Message[]) l.toArray(new Message[l.size()]);
     }
 
-    public void removeMessage(String messageId) {
+    public boolean removeMessage(String messageId) {
         synchronized (messages) {
             ConnectionContext c = new ConnectionContext();
             for (Iterator iter = messages.iterator(); iter.hasNext();) {
@@ -455,12 +455,14 @@
                             r.drop();
                             dropEvent();
                             iter.remove();
+                            return true;
                         }
                     }
                 } catch (IOException e) {
                 }
             }
         }
+        return false;
     }
 
     public Message getMessage(String messageId) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Mon Mar  6 23:39:51 2006
@@ -103,16 +103,26 @@
     /**
      * @return number of messages pending delivery
      */
-    int pending();
+    int getPendingQueueSize();
     
     /**
-     * @return number of messages dispatched
+     * @return number of messages dispatched to the client
      */
-    int dispatched();
-    
+    int getDispatchedQueueSize();
+        
     /**
-     * @return number of messages delivered
+     * @return number of messages dispatched to the client
      */
-    int delivered();
+    long getDispatchedCounter();
     
+    /**
+     * @return number of messages that matched the subscription
+     */
+    long getEnqueueCounter();
+
+    /**
+     * @return number of messages queued by the client
+     */
+    long getDequeueCounter();
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Mon Mar  6 23:39:51 2006
@@ -16,8 +16,10 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedList;
+
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
@@ -33,21 +35,25 @@
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
 
 
 public class TopicSubscription extends AbstractSubscription{
+	
     private static final Log log=LogFactory.getLog(TopicSubscription.class);
+    
     final protected LinkedList matched=new LinkedList();
     final protected ActiveMQDestination dlqDestination=new ActiveMQQueue("ActiveMQ.DLQ");
     final protected UsageManager usageManager;
-    protected AtomicInteger dispatched=new AtomicInteger();
-    protected AtomicInteger delivered=new AtomicInteger();
+    protected AtomicLong dispatched=new AtomicLong();
+    protected AtomicLong delivered=new AtomicLong();
     private int maximumPendingMessages=-1;
     private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
     private int discarded = 0;
     private final Object matchedListMutex=new Object();
-
+    long enqueueCounter;
+    
     public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager)
                     throws InvalidSelectorException{
         super(broker,context,info);
@@ -55,6 +61,7 @@
     }
 
     public void add(MessageReference node) throws InterruptedException,IOException{
+        enqueueCounter++;
         node.incrementReferenceCount();
         if(!isFull()&&!isSlaveBroker()){
             // if maximumPendingMessages is set we will only discard messages which
@@ -126,21 +133,30 @@
         throw new JMSException("Invalid acknowledgment: "+ack);
     }
 
-    public int pending(){
-        return matched()-dispatched();
+    public int getPendingQueueSize(){
+        return matched();
     }
 
-    public int dispatched(){
-        return dispatched.get();
-    }
-
-    public int delivered(){
-        return delivered.get();
+    public int getDispatchedQueueSize(){
+        return (int)(dispatched.get()-delivered.get());
     }
 
     public int getMaximumPendingMessages(){
         return maximumPendingMessages;
     }
+    
+	public long getDispatchedCounter() {
+		return dispatched.get();
+	}
+
+	public long getEnqueueCounter() {
+		return enqueueCounter;
+	}
+    public long getDequeueCounter(){
+        return delivered.get();
+    }
+
+
 
     /**
      * @return the number of messages discarded due to being a slow consumer
@@ -223,6 +239,7 @@
 
     public String toString(){
         return "TopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
-                        +", dispatched="+dispatched()+", delivered="+delivered()+", matched="+matched()+", discarded="+discarded();
+                        +", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+", matched="+matched()+", discarded="+discarded();
     }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java Mon Mar  6 23:39:51 2006
@@ -310,6 +310,69 @@
                 message.setGroupSequence(rc.intValue());
             }
         });
+        JMS_PROPERTY_SETERS.put("JMSCorrelationID", new PropertySetter() {
+            public void set(Message message, Object value) throws MessageFormatException {
+            	String rc = (String)TypeConversionSupport.convert(value, String.class);
+                if( rc == null ) {
+                    throw new MessageFormatException("Property JMSCorrelationID cannot be set from a "+value.getClass().getName()+".");
+                }
+                ((ActiveMQMessage)message).setJMSCorrelationID(rc);
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSExpiration", new PropertySetter() {
+            public void set(Message message, Object value) throws MessageFormatException {
+                Long rc = (Long)TypeConversionSupport.convert(value, Long.class);
+                if( rc == null ) {
+                    throw new MessageFormatException("Property JMSExpiration cannot be set from a "+value.getClass().getName()+".");
+                }
+                ((ActiveMQMessage)message).setJMSExpiration(rc.longValue());
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSPriority", new PropertySetter() {
+            public void set(Message message, Object value) throws MessageFormatException {
+                Integer rc = (Integer)TypeConversionSupport.convert(value, Integer.class);
+                if( rc == null ) {
+                    throw new MessageFormatException("Property JMSPriority cannot be set from a "+value.getClass().getName()+".");
+                }
+                ((ActiveMQMessage)message).setJMSPriority(rc.intValue());
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSRedelivered", new PropertySetter() {
+            public void set(Message message, Object value) throws MessageFormatException {
+                Boolean rc = (Boolean)TypeConversionSupport.convert(value, Boolean.class);
+                if( rc == null ) {
+                    throw new MessageFormatException("Property JMSRedelivered cannot be set from a "+value.getClass().getName()+".");
+                }
+                ((ActiveMQMessage)message).setJMSRedelivered(rc.booleanValue());
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSReplyTo", new PropertySetter() {
+            public void set(Message message, Object value) throws MessageFormatException {
+                ActiveMQDestination rc = (ActiveMQDestination)TypeConversionSupport.convert(value, ActiveMQDestination.class);
+                if( rc == null ) {
+                    throw new MessageFormatException("Property JMSReplyTo cannot be set from a "+value.getClass().getName()+".");
+                }
+                ((ActiveMQMessage)message).setReplyTo(rc);
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSTimestamp", new PropertySetter() {
+            public void set(Message message, Object value) throws MessageFormatException {
+                Long rc = (Long)TypeConversionSupport.convert(value, Long.class);
+                if( rc == null ) {
+                    throw new MessageFormatException("Property JMSTimestamp cannot be set from a "+value.getClass().getName()+".");
+                }
+                ((ActiveMQMessage)message).setJMSTimestamp(rc.longValue());
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSType", new PropertySetter() {
+            public void set(Message message, Object value) throws MessageFormatException {
+                String rc = (String)TypeConversionSupport.convert(value, String.class);
+                if( rc == null ) {
+                    throw new MessageFormatException("Property JMSType cannot be set from a "+value.getClass().getName()+".");
+                }
+                ((ActiveMQMessage)message).setJMSType(rc);
+            }
+        });
     }
 
     public void setObjectProperty(String name, Object value) throws JMSException {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java?rev=383815&r1=383814&r2=383815&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java Mon Mar  6 23:39:51 2006
@@ -16,8 +16,11 @@
  */
 package org.apache.activemq.util;
 
+import java.util.Date;
 import java.util.HashMap;
 
+import org.apache.activemq.command.ActiveMQDestination;
+
 public class TypeConversionSupport {
 
     static class ConversionKey {
@@ -104,6 +107,11 @@
         CONVERSION_MAP.put(new ConversionKey(Byte.class, Long.class), longConverter);
         CONVERSION_MAP.put(new ConversionKey(Short.class, Long.class), longConverter);
         CONVERSION_MAP.put(new ConversionKey(Integer.class, Long.class), longConverter);
+        CONVERSION_MAP.put(new ConversionKey(Date.class, Long.class), new Converter() {
+            public Object convert(Object value) {
+                return new Long(((Date) value).getTime());
+            }
+        });
 
         Converter intConverter = new Converter() {
             public Object convert(Object value) {
@@ -122,6 +130,11 @@
         CONVERSION_MAP.put(new ConversionKey(Float.class, Double.class), new Converter() {
             public Object convert(Object value) {
                 return new Double(((Number) value).doubleValue());
+            }
+        });
+        CONVERSION_MAP.put(new ConversionKey(String.class, ActiveMQDestination.class), new Converter() {
+            public Object convert(Object value) {
+                return ActiveMQDestination.createDestination((String) value, ActiveMQDestination.QUEUE_TYPE);
             }
         });
     }



Mime
View raw message