activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r612542 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: advisory/ broker/region/
Date Wed, 16 Jan 2008 19:03:07 GMT
Author: rajdavies
Date: Wed Jan 16 11:03:02 2008
New Revision: 612542

URL: http://svn.apache.org/viewvc?rev=612542&view=rev
Log:
Add producers to DestinationStatistics

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Wed Jan 16 11:03:02 2008
@@ -277,7 +277,17 @@
 
     protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic
topic, Command command, ConsumerId targetConsumerId) throws Exception {
         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-        advisoryMessage.setIntProperty("producerCount", producers.size());
+        int count = 0;
+        if (producerDestination != null) {
+            Set<Destination> set = getDestinations(producerDestination);
+            if (set != null) {
+                for (Destination dest : set) {
+                    count += dest.getDestinationStatistics().getConsumers()
+                            .getCount();
+                }
+            }
+        }
+        advisoryMessage.setIntProperty("producerCount", count);
         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Wed Jan 16 11:03:02 2008
@@ -37,6 +37,7 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.DestinationFilter;
@@ -408,5 +409,26 @@
     public void setAutoCreateDestinations(boolean autoCreateDestinations) {
         this.autoCreateDestinations = autoCreateDestinations;
     }
+    
+    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{
+        for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();)
{
+            Destination dest = (Destination)iter.next();
+            dest.addProducer(context, info);
+        }
+    }
+
+    /**
+     * Removes a Producer.
+     * @param context the environment the operation is being executed under.
+     * @throws Exception TODO
+     */
+    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
+        for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();)
{
+            Destination dest = (Destination)iter.next();
+            dest.removeProducer(context, info);
+        }
+    }
+
+
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Wed Jan 16 11:03:02 2008
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.broker.region;
 
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ProducerInfo;
+
 
 /**
  * @version $Revision: 1.12 $
@@ -26,6 +29,7 @@
     private int maxProducersToAudit=1024;
     private int maxAuditDepth=1;
     private boolean enableAudit=true;
+    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
     /**
      * @return the producerFlowControl
      */
@@ -73,6 +77,14 @@
      */
     public void setEnableAudit(boolean enableAudit) {
         this.enableAudit = enableAudit;
+    }
+    
+    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{
+        destinationStatistics.getProducers().increment();
+    }
+
+    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
+        destinationStatistics.getProducers().decrement();
     }
 
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Wed Jan 16 11:03:02 2008
@@ -25,6 +25,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
@@ -37,6 +38,10 @@
     void addSubscription(ConnectionContext context, Subscription sub) throws Exception;
 
     void removeSubscription(ConnectionContext context, Subscription sub) throws Exception;
+    
+    void addProducer(ConnectionContext context, ProducerInfo info) throws Exception;
+
+    void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
 
     void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Wed Jan 16 11:03:02 2008
@@ -27,6 +27,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
@@ -128,6 +129,17 @@
     
     public void setProducerFlowControl(boolean value){
         next.setProducerFlowControl(value);
+    }
+
+    public void addProducer(ConnectionContext context, ProducerInfo info)
+            throws Exception {
+        next.addProducer(context, info);
+        
+    }
+
+    public void removeProducer(ConnectionContext context, ProducerInfo info)
+            throws Exception {
+       next.removeProducer(context, info);
     }
     
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
Wed Jan 16 11:03:02 2008
@@ -32,6 +32,7 @@
     protected CountStatisticImpl enqueues;
     protected CountStatisticImpl dequeues;
     protected CountStatisticImpl consumers;
+    protected CountStatisticImpl producers;
     protected CountStatisticImpl messages;
     protected PollCountStatisticImpl messagesCached;
     protected CountStatisticImpl dispatched;
@@ -43,6 +44,7 @@
         dispatched = new CountStatisticImpl("dispatched", "The number of messages that have
been dispatched from the destination");
         dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been
acknowledged from the destination");
         consumers = new CountStatisticImpl("consumers", "The number of consumers that that
are subscribing to messages from the destination");
+        producers = new CountStatisticImpl("producers", "The number of producers that that
are publishing messages to the destination");
         messages = new CountStatisticImpl("messages", "The number of messages that that are
being held by the destination");
         messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages
that are held in the destination's memory cache");
         processTime = new TimeStatisticImpl("processTime", "information around length of
time messages are held by a destination");
@@ -50,6 +52,7 @@
         addStatistic("dispatched", dispatched);
         addStatistic("dequeues", dequeues);
         addStatistic("consumers", consumers);
+        addStatistic("prodcuers", producers);
         addStatistic("messages", messages);
         addStatistic("messagesCached", messagesCached);
         addStatistic("processTime", processTime);
@@ -66,6 +69,10 @@
     public CountStatisticImpl getConsumers() {
         return consumers;
     }
+    
+    public CountStatisticImpl getProducers() {
+        return producers;
+    }
 
     public PollCountStatisticImpl getMessagesCached() {
         return messagesCached;
@@ -100,6 +107,7 @@
         dispatched.setEnabled(enabled);
         dequeues.setEnabled(enabled);
         consumers.setEnabled(enabled);
+        producers.setEnabled(enabled);
         messages.setEnabled(enabled);
         messagesCached.setEnabled(enabled);
         processTime.setEnabled(enabled);
@@ -112,6 +120,7 @@
             dispatched.setParent(parent.dispatched);
             dequeues.setParent(parent.dequeues);
             consumers.setParent(parent.consumers);
+            producers.setParent(parent.producers);
             messagesCached.setParent(parent.messagesCached);
             messages.setParent(parent.messages);
             processTime.setParent(parent.processTime);
@@ -120,6 +129,7 @@
             dispatched.setParent(null);
             dequeues.setParent(null);
             consumers.setParent(null);
+            producers.setParent(null);
             messagesCached.setParent(null);
             messages.setParent(null);
             processTime.setParent(null);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Jan 16 11:03:02 2008
@@ -79,7 +79,6 @@
     private final List<Subscription> consumers = new ArrayList<Subscription>(50);
     private final SystemUsage systemUsage;
     private final MemoryUsage memoryUsage;
-    private final DestinationStatistics destinationStatistics = new DestinationStatistics();
     private PendingMessageCursor messages;
     private final LinkedList<MessageReference> pagedInMessages = new LinkedList<MessageReference>();
     private LockOwner exclusiveOwner;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
Wed Jan 16 11:03:02 2008
@@ -29,6 +29,7 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 
@@ -86,6 +87,21 @@
      * @throws Exception TODO
      */
     void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception;
+    
+    /**
+     * Adds a Producer.
+     * @param context the environment the operation is being executed under.
+     * @throws Exception TODO
+     */
+    void addProducer(ConnectionContext context, ProducerInfo info) throws Exception;
+
+    /**
+     * Removes a Producer.
+     * @param context the environment the operation is being executed under.
+     * @throws Exception TODO
+     */
+    void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
+
 
     /**
      * Deletes a durable subscription.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Wed Jan 16 11:03:02 2008
@@ -322,10 +322,45 @@
     public void removeSession(ConnectionContext context, SessionInfo info) throws Exception
{
     }
 
-    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception
{
+    public void addProducer(ConnectionContext context, ProducerInfo info)
+            throws Exception {
+        ActiveMQDestination destination = info.getDestination();
+        if (destination != null) {
+            switch (destination.getDestinationType()) {
+            case ActiveMQDestination.QUEUE_TYPE:
+                queueRegion.addProducer(context, info);
+                break;
+            case ActiveMQDestination.TOPIC_TYPE:
+                topicRegion.addProducer(context, info);
+                break;
+            case ActiveMQDestination.TEMP_QUEUE_TYPE:
+                tempQueueRegion.addProducer(context, info);
+                break;
+            case ActiveMQDestination.TEMP_TOPIC_TYPE:
+                tempTopicRegion.addProducer(context, info);
+                break;
+            }
+        }
     }
 
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception
{
+        ActiveMQDestination destination = info.getDestination();
+        if (destination != null) {
+            switch (destination.getDestinationType()) {
+            case ActiveMQDestination.QUEUE_TYPE:
+                queueRegion.removeProducer(context, info);
+                break;
+            case ActiveMQDestination.TOPIC_TYPE:
+                topicRegion.removeProducer(context, info);
+                break;
+            case ActiveMQDestination.TEMP_QUEUE_TYPE:
+                tempQueueRegion.removeProducer(context, info);
+                break;
+            case ActiveMQDestination.TEMP_TOPIC_TYPE:
+                tempTopicRegion.removeProducer(context, info);
+                break;
+            }
+        }
     }
 
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws
Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Wed Jan 16 11:03:02 2008
@@ -76,8 +76,7 @@
     protected final TopicMessageStore store;
     private final SystemUsage systemUsage;
     private final MemoryUsage memoryUsage;
-    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
-
+   
     private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private boolean sendAdvisoryIfNoConsumers;



Mime
View raw message