Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 58592 invoked from network); 16 Jan 2008 13:57:00 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 16 Jan 2008 13:57:00 -0000 Received: (qmail 67945 invoked by uid 500); 16 Jan 2008 13:56:46 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 67888 invoked by uid 500); 16 Jan 2008 13:56:46 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 67846 invoked by uid 99); 16 Jan 2008 13:56:46 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Jan 2008 05:56:46 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Jan 2008 13:56:29 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5AC1F1A9832; Wed, 16 Jan 2008 05:56:35 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r612459 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: advisory/AdvisoryBroker.java broker/BrokerFilter.java broker/BrokerService.java broker/region/Region.java broker/region/RegionBroker.java Date: Wed, 16 Jan 2008 13:56:29 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080116135635.5AC1F1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Wed Jan 16 05:56:24 2008 New Revision: 612459 URL: http://svn.apache.org/viewvc?rev=612459&view=rev Log: set correct consumer count on consumer advisories Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=612459&r1=612458&r2=612459&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Wed Jan 16 05:56:24 2008 @@ -17,6 +17,7 @@ package org.apache.activemq.advisory; import java.util.Iterator; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.broker.Broker; @@ -83,7 +84,7 @@ if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); consumers.put(info.getConsumerId(), info); - fireConsumerAdvisory(context, topic, info); + fireConsumerAdvisory(context,info.getDestination(), topic, info); } else { // We need to replay all the previously collected state objects @@ -114,7 +115,7 @@ for (Iterator iter = producers.values().iterator(); iter.hasNext();) { ProducerInfo value = iter.next(); ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination()); - fireProducerAdvisory(context, topic, value, info.getConsumerId()); + fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId()); } } @@ -123,7 +124,7 @@ for (Iterator iter = consumers.values().iterator(); iter.hasNext();) { ConsumerInfo value = iter.next(); ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); - fireConsumerAdvisory(context, topic, value, info.getConsumerId()); + fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId()); } } } @@ -219,7 +220,7 @@ if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); consumers.remove(info.getConsumerId()); - fireConsumerAdvisory(context, topic, info.createRemoveCommand()); + fireConsumerAdvisory(context,info.getDestination(), topic, info.createRemoveCommand()); } } @@ -230,7 +231,7 @@ if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); producers.remove(info.getProducerId()); - fireProducerAdvisory(context, topic, info.createRemoveCommand()); + fireProducerAdvisory(context, info.getDestination(),topic, info.createRemoveCommand()); } } @@ -253,21 +254,28 @@ fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); } - protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { - fireConsumerAdvisory(context, topic, command, null); + protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception { + fireConsumerAdvisory(context, consumerDestination,topic, command, null); } - protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { + protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { ActiveMQMessage advisoryMessage = new ActiveMQMessage(); - advisoryMessage.setIntProperty("consumerCount", consumers.size()); + int count = 0; + Setset = getDestinations(consumerDestination); + if (set != null) { + for (Destination dest:set) { + count += dest.getDestinationStatistics().getConsumers().getCount(); + } + } + advisoryMessage.setIntProperty("consumerCount", count); fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); } - protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { - fireProducerAdvisory(context, topic, command, null); + protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception { + fireProducerAdvisory(context,producerDestination, topic, command, null); } - protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { + protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setIntProperty("producerCount", producers.size()); fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=612459&r1=612458&r2=612459&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Wed Jan 16 05:56:24 2008 @@ -65,7 +65,7 @@ return next.getDestinationMap(); } - public Set getDestinations(ActiveMQDestination destination) { + public Set getDestinations(ActiveMQDestination destination) { return next.getDestinations(destination); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=612459&r1=612458&r2=612459&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Jan 16 05:56:24 2008 @@ -725,16 +725,20 @@ if (persistenceAdapter == null) { persistenceAdapter = createPersistenceAdapter(); configureService(persistenceAdapter); + this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); } return persistenceAdapter; } /** * Sets the persistence adaptor implementation to use for this broker + * @throws IOException */ - public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) { + public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { this.persistenceAdapter = persistenceAdapter; configureService(this.persistenceAdapter); + this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); + } public TaskRunnerFactory getTaskRunnerFactory() { @@ -1311,6 +1315,24 @@ throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e); } } + } + } + + protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { + MBeanServer mbeanServer = getManagementContext().getMBeanServer(); + if (mbeanServer != null) { + + + } + return adaptor; + } + + protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { + if (isUseJmx()) { + MBeanServer mbeanServer = getManagementContext().getMBeanServer(); + if (mbeanServer != null) { + + } } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?rev=612459&r1=612458&r2=612459&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java Wed Jan 16 05:56:24 2008 @@ -131,6 +131,6 @@ * * @return a set of matching destination objects. */ - Set getDestinations(ActiveMQDestination destination); + Set getDestinations(ActiveMQDestination destination); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=612459&r1=612458&r2=612459&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Wed Jan 16 05:56:24 2008 @@ -120,7 +120,7 @@ return answer; } - public Set getDestinations(ActiveMQDestination destination) { + public Set getDestinations(ActiveMQDestination destination) { switch (destination.getDestinationType()) { case ActiveMQDestination.QUEUE_TYPE: return queueRegion.getDestinations(destination);