Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 36623 invoked from network); 7 Mar 2006 07:40:22 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 7 Mar 2006 07:40:22 -0000 Received: (qmail 78675 invoked by uid 500); 7 Mar 2006 07:40:22 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 78653 invoked by uid 500); 7 Mar 2006 07:40:22 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 78644 invoked by uid 99); 7 Mar 2006 07:40:22 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Mar 2006 23:40:22 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 06 Mar 2006 23:40:20 -0800 Received: (qmail 36391 invoked by uid 65534); 7 Mar 2006 07:39:59 -0000 Message-ID: <20060307073959.36388.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r383815 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/ broker/jmx/ broker/region/ command/ util/ Date: Tue, 07 Mar 2006 07:39:54 -0000 To: activemq-commits@geronimo.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.7 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: chirino Date: Mon Mar 6 23:39:51 2006 New Revision: 383815 URL: http://svn.apache.org/viewcvs?rev=383815&view=rev Log: - A bunch of JMX related tiding up. - Added a sendTextMessage() method to the DestinationView so that a JMX console can send a test message to a destination. - Renamed a few bean properties so that they would be more consistent: - use get*Counter for properties that keep incrementing - use get*QueueSize for poperties that repesenet the size of list in broker. - the gc() method on the broker is now the only one expose - the CachedMessageCount is not exposed anymore. Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionViewMBean.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Mar 6 23:39:51 2006 @@ -22,7 +22,6 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; -import java.util.Hashtable; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -754,12 +753,13 @@ protected void registerConnectorMBean(TransportConnector connector) throws IOException, URISyntaxException { MBeanServer mbeanServer = getManagementContext().getMBeanServer(); ConnectorViewMBean view = new ConnectorView(connector); - Hashtable map = new Hashtable(); - map.put("Type", "Connector"); - map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName())); - map.put("ConnectorName", JMXSupport.encodeObjectNamePart(connector.getName())); try { - ObjectName objectName = new ObjectName("org.apache.activemq", map); + ObjectName objectName = new ObjectName( + managementContext.getJmxDomainName()+":"+ + "BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+ + "Type=Connector,"+ + "ConnectorName="+JMXSupport.encodeObjectNamePart(connector.getName()) + ); mbeanServer.registerMBean(view, objectName); registeredMBeanNames.add(objectName); } @@ -771,13 +771,12 @@ protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { MBeanServer mbeanServer = getManagementContext().getMBeanServer(); NetworkConnectorViewMBean view = new NetworkConnectorView(connector); - Hashtable map = new Hashtable(); - map.put("Type", "NetworkConnector"); - map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName())); - // map.put("ConnectorName", - // JMXSupport.encodeObjectNamePart(connector.())); try { - ObjectName objectName = new ObjectName("org.apache.activemq", map); + ObjectName objectName = new ObjectName( + managementContext.getJmxDomainName()+":"+ + "BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+ + "Type=NetworkConnector" + ); mbeanServer.registerMBean(view, objectName); registeredMBeanNames.add(objectName); } @@ -789,13 +788,12 @@ protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { MBeanServer mbeanServer = getManagementContext().getMBeanServer(); ProxyConnectorView view = new ProxyConnectorView(connector); - Hashtable map = new Hashtable(); - map.put("Type", "ProxyConnector"); - map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName())); - // map.put("ConnectorName", - // JMXSupport.encodeObjectNamePart(connector.())); try { - ObjectName objectName = new ObjectName("org.apache.activemq", map); + ObjectName objectName = new ObjectName( + managementContext.getJmxDomainName()+":"+ + "BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+ + "Type=ProxyConnector" + ); mbeanServer.registerMBean(view, objectName); registeredMBeanNames.add(objectName); } @@ -807,13 +805,12 @@ protected void registerFTConnectorMBean(MasterConnector connector) throws IOException { MBeanServer mbeanServer = getManagementContext().getMBeanServer(); FTConnectorView view = new FTConnectorView(connector); - Hashtable map = new Hashtable(); - map.put("Type", "MasterConnector"); - map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName())); - // map.put("ConnectorName", - // JMXSupport.encodeObjectNamePart(connector.())); try { - ObjectName objectName = new ObjectName("org.apache.activemq", map); + ObjectName objectName = new ObjectName( + managementContext.getJmxDomainName()+":"+ + "BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+ + "Type=MasterConnector" + ); mbeanServer.registerMBean(view, objectName); registeredMBeanNames.add(objectName); } @@ -825,13 +822,12 @@ protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { MBeanServer mbeanServer = getManagementContext().getMBeanServer(); JmsConnectorView view = new JmsConnectorView(connector); - Hashtable map = new Hashtable(); - map.put("Type", "JmsConnector"); - map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName())); - // map.put("ConnectorName", - // JMXSupport.encodeObjectNamePart(connector.())); try { - ObjectName objectName = new ObjectName("org.apache.activemq", map); + ObjectName objectName = new ObjectName( + managementContext.getJmxDomainName()+":"+ + "BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+ + "Type=JmsConnector" + ); mbeanServer.registerMBean(view, objectName); registeredMBeanNames.add(objectName); } @@ -867,7 +863,7 @@ if (isUseJmx()) { ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; managedBroker.setContextBroker(broker); - BrokerViewMBean view = new BrokerView(managedBroker, getMemoryManager()); + BrokerViewMBean view = new BrokerView(this, managedBroker); MBeanServer mbeanServer = getManagementContext().getMBeanServer(); ObjectName objectName = getBrokerObjectName(); mbeanServer.registerMBean(view, objectName); @@ -942,10 +938,11 @@ protected ObjectName createBrokerObjectName() throws IOException { try { - Hashtable map = new Hashtable(); - map.put("Type", "Broker"); - map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName())); - return new ObjectName(getManagementContext().getJmxDomainName(), map); + return new ObjectName( + getManagementContext().getJmxDomainName()+":"+ + "BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+ + "Type=Broker" + ); } catch (Throwable e) { throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Mon Mar 6 23:39:51 2006 @@ -19,35 +19,35 @@ import javax.management.ObjectName; import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.memory.UsageManager; public class BrokerView implements BrokerViewMBean { private final ManagedRegionBroker broker; - private final UsageManager usageManager; + private final BrokerService brokerService; - public BrokerView(ManagedRegionBroker broker, UsageManager usageManager) { - this.broker = broker; - this.usageManager = usageManager; + public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception { + this.brokerService = brokerService; + this.broker = managedBroker; } public String getBrokerId() { return broker.getBrokerId().toString(); } - public void gc() { - broker.gc(); + public void gc() throws Exception { + brokerService.getBroker().gc(); } public void start() throws Exception { - broker.start(); + brokerService.start(); } public void stop() throws Exception { - broker.stop(); + brokerService.stop(); } public long getTotalEnqueueCount() { @@ -59,7 +59,7 @@ public long getTotalConsumerCount() { return broker.getDestinationStatistics().getConsumers().getCount(); } - public long getTotalMessages() { + public long getTotalMessageCount() { return broker.getDestinationStatistics().getMessages().getCount(); } public long getTotalMessagesCached() { @@ -67,13 +67,13 @@ } public int getMemoryPercentageUsed() { - return usageManager.getPercentUsage(); + return brokerService.getMemoryManager().getPercentUsage(); } public long getMemoryLimit() { - return usageManager.getLimit(); + return brokerService.getMemoryManager().getLimit(); } public void setMemoryLimit(long limit) { - usageManager.setLimit(limit); + brokerService.getMemoryManager().setLimit(limit); } public void resetStatistics() { Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Mon Mar 6 23:39:51 2006 @@ -21,23 +21,42 @@ public interface BrokerViewMBean extends Service { + /** + * @return The unique id of the broker. + */ public abstract String getBrokerId(); - public abstract void gc(); + /** + * The Broker will fush it's caches so that the garbage + * collector can recalaim more memory. + * + * @throws Exception + */ + public void gc() throws Exception; + + public void resetStatistics(); public long getTotalEnqueueCount(); public long getTotalDequeueCount(); public long getTotalConsumerCount(); - public long getTotalMessages(); - public long getTotalMessagesCached(); - + public long getTotalMessageCount(); + public int getMemoryPercentageUsed(); public long getMemoryLimit(); public void setMemoryLimit(long limit); + /** + * Shuts down the JVM. + * @param exitCode the exit code that will be reported by the JVM process when it exits. + */ public void terminateJVM(int exitCode); + /** + * Stop the broker and all it's comonents. + */ + public void stop() throws Exception; + public ObjectName[] getTopics(); public ObjectName[] getQueues(); public ObjectName[] getTemporaryTopics(); @@ -50,9 +69,32 @@ public ObjectName[] getTemporaryTopicSubscribers(); public ObjectName[] getTemporaryQueueSubscribers(); + /** + * Adds a Topic destination to the broker. + * @param name The name of the Topic + * @throws Exception + */ public void addTopic(String name) throws Exception; + + /** + * Adds a Queue destination to the broker. + * @param name The name of the Queue + * @throws Exception + */ public void addQueue(String name) throws Exception; + + /** + * Removes a Topic destination from the broker. + * @param name The name of the Topic + * @throws Exception + */ public void removeTopic(String name) throws Exception; + + /** + * Removes a Queue destination from the broker. + * @param name The name of the Queue + * @throws Exception + */ public void removeQueue(String name) throws Exception; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Mon Mar 6 23:39:51 2006 @@ -13,6 +13,13 @@ */ package org.apache.activemq.broker.jmx; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Session; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.CompositeType; @@ -21,9 +28,12 @@ import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.Message; public class DestinationView { @@ -55,7 +65,7 @@ return destination.getDestinationStatistics().getConsumers().getCount(); } - public long getMessages(){ + public long getQueueSize(){ return destination.getDestinationStatistics().getMessages().getCount(); } @@ -87,4 +97,42 @@ } return rc; } + + public String sendTextMessage(String body) throws Exception { + return sendTextMessage(Collections.EMPTY_MAP, body); + } + + public String sendTextMessage(Map headers, String body) throws Exception { + + String brokerUrl = "vm://"+broker.getBrokerName(); + ActiveMQDestination dest = destination.getActiveMQDestination(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl); + Connection connection = null; + try { + + connection = cf.createConnection(); + Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body); + + for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) { + Map.Entry entry = (Map.Entry) iter.next(); + msg.setObjectProperty((String) entry.getKey(), entry.getValue()); + } + + producer.setDeliveryMode(msg.getJMSDeliveryMode()); + producer.setPriority(msg.getPriority()); + long ttl = msg.getExpiration() - System.currentTimeMillis(); + producer.setTimeToLive(ttl > 0 ? ttl : 0); + producer.send(msg); + + return msg.getJMSMessageID(); + + } finally { + connection.close(); + } + + } + } Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=383815&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Mon Mar 6 23:39:51 2006 @@ -0,0 +1,79 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import java.util.Map; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; + + +public interface DestinationViewMBean { + + /** + * Resets the managment counters. + */ + public void resetStatistics(); + + /** + * @return The number of messages that have been sent to the destination. + */ + public long getEnqueueCount(); + + /** + * @return The number of messages that have been received from the destination. + */ + public long getDequeueCount(); + + /** + * @return The number of consmers subscribed to messages from this destination. + */ + public long getConsumerCount(); + + /** + * @return The number of messages being buffered by this destination + */ + public long getQueueSize(); + + /** + * @return An array of all the messages in the destination's queue. + */ + public CompositeData[] browse() throws OpenDataException; + + /** + * @return A list of all the messages in the destination's queue. + */ + public TabularData browseAsTable() throws OpenDataException; + + /** + * Sends a TextMesage to the destination. + * @param body the text to send + * @return the message id of the message sent. + * @throws Exception + */ + public String sendTextMessage(String body) throws Exception; + + /** + * Sends a TextMesage to the destination. + * @param headers the message headers and properties to set. Can only container Strings maped to primitive types. + * @param body the text to send + * @return the message id of the message sent. + * @throws Exception + */ + public String sendTextMessage(Map headers, String body) throws Exception; +} \ No newline at end of file Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java Mon Mar 6 23:39:51 2006 @@ -21,7 +21,7 @@ */ public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean{ /** - * @return name of the durable consumer + * @return name of the durable subscription name */ public String getSubscriptionName(); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Mon Mar 6 23:39:51 2006 @@ -44,7 +44,6 @@ import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.TopicSubscription; -import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; @@ -116,18 +115,21 @@ public void register(ActiveMQDestination destName,Destination destination){ // Build the object name for the destination - Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList()); - map.put("Type",JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())); - map.put("Destination",JMXSupport.encodeObjectNamePart(destName.getPhysicalName())); + Hashtable map=brokerObjectName.getKeyPropertyList(); try{ - ObjectName destObjectName=new ObjectName(brokerObjectName.getDomain(),map); + ObjectName objectName = new ObjectName( + brokerObjectName.getDomain()+":"+ + "BrokerName="+map.get("BrokerName")+","+ + "Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+ + "Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName()) + ); DestinationView view; if(destination instanceof Queue){ view=new QueueView(this, (Queue) destination); }else{ view=new TopicView(this, (Topic) destination); } - registerDestination(destObjectName,destName,view); + registerDestination(objectName,destName,view); }catch(Exception e){ log.error("Failed to register destination "+destName,e); } @@ -136,11 +138,14 @@ public void unregister(ActiveMQDestination destName){ // Build the object name for the destination Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList()); - map.put("Type",JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())); - map.put("Destination",JMXSupport.encodeObjectNamePart(destName.getPhysicalName())); try{ - ObjectName destObjectName=new ObjectName(brokerObjectName.getDomain(),map); - unregisterDestination(destObjectName); + ObjectName objectName = new ObjectName( + brokerObjectName.getDomain()+":"+ + "BrokerName="+map.get("BrokerName")+","+ + "Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+ + "Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName()) + ); + unregisterDestination(objectName); }catch(Exception e){ log.error("Failed to unregister "+destName,e); } @@ -148,13 +153,18 @@ public void registerSubscription(ConnectionContext context,Subscription sub){ SubscriptionKey key = new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName()); - Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList()); - map.put("Type",JMXSupport.encodeObjectNamePart("Subscription")); + Hashtable map=brokerObjectName.getKeyPropertyList(); String name = key.toString() + ":" + sub.getConsumerInfo().toString(); - map.put("name",JMXSupport.encodeObjectNamePart(name)); - map.put("active", "true"); try{ - ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map); + + ObjectName objectName = new ObjectName( + brokerObjectName.getDomain()+":"+ + "BrokerName="+map.get("BrokerName")+","+ + "Type=Subscription,"+ + "active=true,"+ + "name="+JMXSupport.encodeObjectNamePart(name)+"" + ); + SubscriptionView view; if(sub.getConsumerInfo().isDurable()){ view=new DurableSubscriptionView(this,context.getClientId(),sub); @@ -292,12 +302,16 @@ } protected void addInactiveSubscription(SubscriptionKey key,SubscriptionInfo info){ - Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList()); - map.put("Type",JMXSupport.encodeObjectNamePart("Subscription")); - map.put("name",JMXSupport.encodeObjectNamePart(key.toString())); - map.put("active", "false"); + Hashtable map=brokerObjectName.getKeyPropertyList(); try{ - ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map); + ObjectName objectName = new ObjectName( + brokerObjectName.getDomain()+":"+ + "BrokerName="+map.get("BrokerName")+","+ + "Type=Subscription,"+ + "active=false,"+ + "name="+JMXSupport.encodeObjectNamePart(key.toString())+"" + ); + SubscriptionView view = new InactiveDurableSubscriptionView(this,key.getClientId(),info); mbeanServer.registerMBean(view,objectName); inactiveDurableTopicSubscribers.put(objectName,view); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java Mon Mar 6 23:39:51 2006 @@ -113,11 +113,14 @@ protected ObjectName createObjectName() throws IOException { // Build the object name for the destination - Hashtable map = new Hashtable(connectorName.getKeyPropertyList()); - map.put("Type", "Connection"); - map.put("Connection", JMXSupport.encodeObjectNamePart(connectionId)); + Hashtable map = connectorName.getKeyPropertyList(); try { - return new ObjectName(connectorName.getDomain(), map); + return new ObjectName( + connectorName.getDomain()+":"+ + "BrokerName="+map.get("BrokerName")+","+ + "Type=Connection,"+ + "Connection="+JMXSupport.encodeObjectNamePart(connectionId) + ); } catch (Throwable e) { throw IOExceptionSupport.create(e); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java Mon Mar 6 23:39:51 2006 @@ -15,9 +15,14 @@ import javax.management.openmbean.CompositeData; import javax.management.openmbean.OpenDataException; + import org.apache.activemq.broker.region.Queue; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; + +/** + * Provides a JMX Management view of a Queue. + */ public class QueueView extends DestinationView implements QueueViewMBean{ public QueueView(ManagedRegionBroker broker, Queue destination){ super(broker, destination); @@ -30,8 +35,8 @@ return OpenTypeSupport.convert(rc); } - public void removeMessage(String messageId){ - ((Queue) destination).removeMessage(messageId); + public boolean removeMessage(String messageId){ + return ((Queue) destination).removeMessage(messageId); } public void purge(){ @@ -41,4 +46,5 @@ public boolean copyMessageTo(String messageId, String destinationName) throws Exception { return ((Queue) destination).copyMessageTo(BrokerView.getConnectionContext(broker.getContextBroker()), messageId, ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE)); } + } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java Mon Mar 6 23:39:51 2006 @@ -18,26 +18,42 @@ import javax.management.openmbean.CompositeData; import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.TabularData; -public interface QueueViewMBean { +public interface QueueViewMBean extends DestinationViewMBean { - public void gc(); - public void resetStatistics(); - - public long getEnqueueCount(); - public long getDequeueCount(); - public long getConsumerCount(); - public long getMessages(); - public long getMessagesCached(); - - - public CompositeData[] browse() throws OpenDataException; - public TabularData browseAsTable() throws OpenDataException; + /** + * Retrieve a message from the destination's queue. + * + * @param messageId the message id of the message to retreive + * @return A CompositeData object which is a JMX version of the messages + * @throws OpenDataException + */ public CompositeData getMessage(String messageId) throws OpenDataException; - public void removeMessage(String messageId); + + /** + * Removes a message from the queue. If the message has allready been dispatched + * to another consumer, the message cannot be delted and this method will return + * false. + * + * @param messageId + * @return true if the message was found and could be succesfully deleted. + */ + public boolean removeMessage(String messageId); + + /** + * Emptys out all the messages in the queue. + */ public void purge(); - + + /** + * Copys a given message to another destination. + * + * @param messageId + * @param destinationName + * @return true if the message was found and was successfuly copied to the other destination. + * @throws Exception + */ public boolean copyMessageTo(String messageId, String destinationName) throws Exception; + } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java Mon Mar 6 23:39:51 2006 @@ -150,24 +150,38 @@ /** * @return number of messages pending delivery */ - public int getPending(){ - return subscription != null ? subscription.pending() : 0; + public int getPendingQueueSize(){ + return subscription != null ? subscription.getPendingQueueSize() : 0; } /** * @return number of messages dispatched */ - public int getDispatched(){ - return subscription != null ? subscription.dispatched() : 0; + public int getDispatchedQueueSize(){ + return subscription != null ? subscription.getDispatchedQueueSize() : 0; } - + /** - * @return number of messages delivered + * @return number of messages that matched the subscription */ - public int getDelivered(){ - return subscription != null ? subscription.delivered() : 0; + public long getDispachedCounter() { + return subscription != null ? subscription.getDispatchedCounter() : 0; } - + + /** + * @return number of messages that matched the subscription + */ + public long getEnqueueCounter() { + return subscription != null ? subscription.getEnqueueCounter() : 0; + } + + /** + * @return number of messages queued by the client + */ + public long getDequeueCounter() { + return subscription != null ? subscription.getDequeueCounter() : 0; + } + protected ConsumerInfo getConsumerInfo(){ return subscription != null ? subscription.getConsumerInfo() : null; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java Mon Mar 6 23:39:51 2006 @@ -19,7 +19,7 @@ public interface SubscriptionViewMBean{ /** - * @return the clientId + * @return the clientId of the Connection the Subscription is on */ public String getClientId(); /** @@ -63,22 +63,28 @@ public boolean isActive(); /** - * The subscription should release as may references as it can to help the garbage collector reclaim memory. + * @return number of messages pending delivery */ - public void gc(); + public int getPendingQueueSize(); /** - * @return number of messages pending delivery + * @return number of messages dispatched + */ + public int getDispatchedQueueSize(); + + /** + * @return number of messages that matched the subscription */ - public int getPending(); + long getDispachedCounter(); /** - * @return number of messages dispatched + * @return number of messages that matched the subscription */ - public int getDispatched(); + long getEnqueueCounter(); /** - * @return number of messages delivered + * @return number of messages queued by the client */ - public int getDelivered(); + long getDequeueCounter(); + } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java Mon Mar 6 23:39:51 2006 @@ -35,18 +35,26 @@ /** * @return the number of messages discarded due to being a slow consumer */ - public int getDiscarded() { + public int getDiscardedCount() { TopicSubscription topicSubscription = getTopicSubscription(); return topicSubscription != null ? topicSubscription.discarded() : 0; } /** - * @return the number of matched messages (messages targeted for the - * subscription but not yet able to be dispatched due to the - * prefetch buffer being full). + * @return the maximun number of messages that can be pending. */ - public int getMatched() { + public int getMaximumPendingQueueSize() { TopicSubscription topicSubscription = getTopicSubscription(); - return topicSubscription != null ? topicSubscription.matched() : 0; + return topicSubscription != null ? topicSubscription.getMaximumPendingMessages() : 0; + } + + /** + * + */ + public void setMaximumPendingQueueSize(int max) { + TopicSubscription topicSubscription = getTopicSubscription(); + if ( topicSubscription != null ) { + topicSubscription.setMaximumPendingMessages(max); + } } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionViewMBean.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionViewMBean.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionViewMBean.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionViewMBean.java Mon Mar 6 23:39:51 2006 @@ -25,12 +25,12 @@ /** * @return the number of messages discarded due to being a slow consumer */ - public int getDiscarded(); - + public int getDiscardedCount(); + /** - * @return the number of matched messages (messages targeted for the subscription but not - * yet able to be dispatched due to the prefetch buffer being full). + * @return the maximun number of messages that can be pending. */ - public int getMatched(); - + public int getMaximumPendingQueueSize(); + public void setMaximumPendingQueueSize(int max); + } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java Mon Mar 6 23:39:51 2006 @@ -16,27 +16,24 @@ */ package org.apache.activemq.broker.jmx; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.TabularData; - - -public interface TopicViewMBean { +public interface TopicViewMBean extends DestinationViewMBean { - public void gc(); - public void resetStatistics(); + /** + * Creates a durable subscription that is subscribed to this topic. + * + * @param clientId + * @param subscriberName + * @throws Exception + */ + public void createDurableSubscriber(String clientId, String subscriberName) throws Exception; - public long getEnqueueCount(); - public long getDequeueCount(); - public long getConsumerCount(); - public long getMessages(); - public long getMessagesCached(); - - public CompositeData[] browse() throws OpenDataException; - public TabularData browseAsTable() throws OpenDataException; - - public void createDurableSubscriber(String clientId,String subscriberName) throws Exception; - - public void destroyDurableSubscriber(String clientId,String subscriberName) throws Exception; + /** + * Destroys a durable subscription that had previously subscribed to this topic. + * + * @param clientId + * @param subscriberName + * @throws Exception + */ + public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Mar 6 23:39:51 2006 @@ -123,12 +123,11 @@ node.decrementReferenceCount(); } - public int pending(){ + public int getPendingQueueSize(){ if (active){ - return super.pending(); + return super.getPendingQueueSize(); } //TODO: need to get from store - return 0; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Mar 6 23:39:51 2006 @@ -13,6 +13,13 @@ */ package org.apache.activemq.broker.region; +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedList; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; @@ -27,11 +34,6 @@ import org.apache.activemq.util.BrokerSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import java.io.IOException; -import java.util.Iterator; -import java.util.LinkedList; /** * A subscription that honors the pre-fetch option of the ConsumerInfo. * @@ -42,6 +44,7 @@ static private final Log log=LogFactory.getLog(PrefetchSubscription.class); final protected LinkedList matched=new LinkedList(); final protected LinkedList dispatched=new LinkedList(); + protected int delivered=0; int preLoadLimit=1024*100; int preLoadSize=0; @@ -49,6 +52,7 @@ long enqueueCounter; long dispatchCounter; + long aknowledgedCounter; public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info) throws InvalidSelectorException{ @@ -102,12 +106,14 @@ if(inAckRange){ // Don't remove the nodes until we are committed. if(!context.isInTransaction()){ + aknowledgedCounter++; iter.remove(); }else{ // setup a Synchronization to remove nodes from the dispatched list. context.getTransaction().addSynchronization(new Synchronization(){ public void afterCommit() throws Exception{ synchronized(PrefetchSubscription.this){ + aknowledgedCounter++; dispatched.remove(node); delivered--; } @@ -178,6 +184,7 @@ node.decrementReferenceCount(); } iter.remove(); + aknowledgedCounter++; index++; acknowledge(context,ack,node); if(ack.getLastMessageId().equals(messageId)){ @@ -198,18 +205,27 @@ return dispatched.size()-delivered>=info.getPrefetchSize()||preLoadSize>preLoadLimit; } - public int pending(){ - return matched.size() - dispatched.size(); + synchronized public int getPendingQueueSize(){ + return matched.size(); } - public int dispatched(){ + synchronized public int getDispatchedQueueSize(){ return dispatched.size(); } - public int delivered(){ - return delivered; + synchronized public long getDequeueCounter(){ + return aknowledgedCounter; + } + + synchronized public long getDispatchedCounter() { + return dispatchCounter; + } + + synchronized public long getEnqueueCounter() { + return enqueueCounter; } + protected void dispatchMatched() throws IOException{ if(!dispatching){ dispatching=true; @@ -318,11 +334,4 @@ throws IOException{} - public long getDispatchCounter() { - return dispatchCounter; - } - - public long getEnqueueCounter() { - return enqueueCounter; - } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Mar 6 23:39:51 2006 @@ -437,7 +437,7 @@ return (Message[]) l.toArray(new Message[l.size()]); } - public void removeMessage(String messageId) { + public boolean removeMessage(String messageId) { synchronized (messages) { ConnectionContext c = new ConnectionContext(); for (Iterator iter = messages.iterator(); iter.hasNext();) { @@ -455,12 +455,14 @@ r.drop(); dropEvent(); iter.remove(); + return true; } } } catch (IOException e) { } } } + return false; } public Message getMessage(String messageId) { Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Mon Mar 6 23:39:51 2006 @@ -103,16 +103,26 @@ /** * @return number of messages pending delivery */ - int pending(); + int getPendingQueueSize(); /** - * @return number of messages dispatched + * @return number of messages dispatched to the client */ - int dispatched(); - + int getDispatchedQueueSize(); + /** - * @return number of messages delivered + * @return number of messages dispatched to the client */ - int delivered(); + long getDispatchedCounter(); + /** + * @return number of messages that matched the subscription + */ + long getEnqueueCounter(); + + /** + * @return number of messages queued by the client + */ + long getDequeueCounter(); + } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Mon Mar 6 23:39:51 2006 @@ -16,8 +16,10 @@ import java.io.IOException; import java.util.Iterator; import java.util.LinkedList; + import javax.jms.InvalidSelectorException; import javax.jms.JMSException; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; @@ -33,21 +35,25 @@ import org.apache.activemq.transaction.Synchronization; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; + +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong; public class TopicSubscription extends AbstractSubscription{ + private static final Log log=LogFactory.getLog(TopicSubscription.class); + final protected LinkedList matched=new LinkedList(); final protected ActiveMQDestination dlqDestination=new ActiveMQQueue("ActiveMQ.DLQ"); final protected UsageManager usageManager; - protected AtomicInteger dispatched=new AtomicInteger(); - protected AtomicInteger delivered=new AtomicInteger(); + protected AtomicLong dispatched=new AtomicLong(); + protected AtomicLong delivered=new AtomicLong(); private int maximumPendingMessages=-1; private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); private int discarded = 0; private final Object matchedListMutex=new Object(); - + long enqueueCounter; + public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager) throws InvalidSelectorException{ super(broker,context,info); @@ -55,6 +61,7 @@ } public void add(MessageReference node) throws InterruptedException,IOException{ + enqueueCounter++; node.incrementReferenceCount(); if(!isFull()&&!isSlaveBroker()){ // if maximumPendingMessages is set we will only discard messages which @@ -126,21 +133,30 @@ throw new JMSException("Invalid acknowledgment: "+ack); } - public int pending(){ - return matched()-dispatched(); + public int getPendingQueueSize(){ + return matched(); } - public int dispatched(){ - return dispatched.get(); - } - - public int delivered(){ - return delivered.get(); + public int getDispatchedQueueSize(){ + return (int)(dispatched.get()-delivered.get()); } public int getMaximumPendingMessages(){ return maximumPendingMessages; } + + public long getDispatchedCounter() { + return dispatched.get(); + } + + public long getEnqueueCounter() { + return enqueueCounter; + } + public long getDequeueCounter(){ + return delivered.get(); + } + + /** * @return the number of messages discarded due to being a slow consumer @@ -223,6 +239,7 @@ public String toString(){ return "TopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size() - +", dispatched="+dispatched()+", delivered="+delivered()+", matched="+matched()+", discarded="+discarded(); + +", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+", matched="+matched()+", discarded="+discarded(); } + } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java Mon Mar 6 23:39:51 2006 @@ -310,6 +310,69 @@ message.setGroupSequence(rc.intValue()); } }); + JMS_PROPERTY_SETERS.put("JMSCorrelationID", new PropertySetter() { + public void set(Message message, Object value) throws MessageFormatException { + String rc = (String)TypeConversionSupport.convert(value, String.class); + if( rc == null ) { + throw new MessageFormatException("Property JMSCorrelationID cannot be set from a "+value.getClass().getName()+"."); + } + ((ActiveMQMessage)message).setJMSCorrelationID(rc); + } + }); + JMS_PROPERTY_SETERS.put("JMSExpiration", new PropertySetter() { + public void set(Message message, Object value) throws MessageFormatException { + Long rc = (Long)TypeConversionSupport.convert(value, Long.class); + if( rc == null ) { + throw new MessageFormatException("Property JMSExpiration cannot be set from a "+value.getClass().getName()+"."); + } + ((ActiveMQMessage)message).setJMSExpiration(rc.longValue()); + } + }); + JMS_PROPERTY_SETERS.put("JMSPriority", new PropertySetter() { + public void set(Message message, Object value) throws MessageFormatException { + Integer rc = (Integer)TypeConversionSupport.convert(value, Integer.class); + if( rc == null ) { + throw new MessageFormatException("Property JMSPriority cannot be set from a "+value.getClass().getName()+"."); + } + ((ActiveMQMessage)message).setJMSPriority(rc.intValue()); + } + }); + JMS_PROPERTY_SETERS.put("JMSRedelivered", new PropertySetter() { + public void set(Message message, Object value) throws MessageFormatException { + Boolean rc = (Boolean)TypeConversionSupport.convert(value, Boolean.class); + if( rc == null ) { + throw new MessageFormatException("Property JMSRedelivered cannot be set from a "+value.getClass().getName()+"."); + } + ((ActiveMQMessage)message).setJMSRedelivered(rc.booleanValue()); + } + }); + JMS_PROPERTY_SETERS.put("JMSReplyTo", new PropertySetter() { + public void set(Message message, Object value) throws MessageFormatException { + ActiveMQDestination rc = (ActiveMQDestination)TypeConversionSupport.convert(value, ActiveMQDestination.class); + if( rc == null ) { + throw new MessageFormatException("Property JMSReplyTo cannot be set from a "+value.getClass().getName()+"."); + } + ((ActiveMQMessage)message).setReplyTo(rc); + } + }); + JMS_PROPERTY_SETERS.put("JMSTimestamp", new PropertySetter() { + public void set(Message message, Object value) throws MessageFormatException { + Long rc = (Long)TypeConversionSupport.convert(value, Long.class); + if( rc == null ) { + throw new MessageFormatException("Property JMSTimestamp cannot be set from a "+value.getClass().getName()+"."); + } + ((ActiveMQMessage)message).setJMSTimestamp(rc.longValue()); + } + }); + JMS_PROPERTY_SETERS.put("JMSType", new PropertySetter() { + public void set(Message message, Object value) throws MessageFormatException { + String rc = (String)TypeConversionSupport.convert(value, String.class); + if( rc == null ) { + throw new MessageFormatException("Property JMSType cannot be set from a "+value.getClass().getName()+"."); + } + ((ActiveMQMessage)message).setJMSType(rc); + } + }); } public void setObjectProperty(String name, Object value) throws JMSException { Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java?rev=383815&r1=383814&r2=383815&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java Mon Mar 6 23:39:51 2006 @@ -16,8 +16,11 @@ */ package org.apache.activemq.util; +import java.util.Date; import java.util.HashMap; +import org.apache.activemq.command.ActiveMQDestination; + public class TypeConversionSupport { static class ConversionKey { @@ -104,6 +107,11 @@ CONVERSION_MAP.put(new ConversionKey(Byte.class, Long.class), longConverter); CONVERSION_MAP.put(new ConversionKey(Short.class, Long.class), longConverter); CONVERSION_MAP.put(new ConversionKey(Integer.class, Long.class), longConverter); + CONVERSION_MAP.put(new ConversionKey(Date.class, Long.class), new Converter() { + public Object convert(Object value) { + return new Long(((Date) value).getTime()); + } + }); Converter intConverter = new Converter() { public Object convert(Object value) { @@ -122,6 +130,11 @@ CONVERSION_MAP.put(new ConversionKey(Float.class, Double.class), new Converter() { public Object convert(Object value) { return new Double(((Number) value).doubleValue()); + } + }); + CONVERSION_MAP.put(new ConversionKey(String.class, ActiveMQDestination.class), new Converter() { + public Object convert(Object value) { + return ActiveMQDestination.createDestination((String) value, ActiveMQDestination.QUEUE_TYPE); } }); }