activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r470692 - in /incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker: jmx/ region/
Date Fri, 03 Nov 2006 06:47:10 GMT
Author: jlim
Date: Thu Nov  2 22:47:10 2006
New Revision: 470692

URL: http://svn.apache.org/viewvc?view=rev&rev=470692
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-1023

Modified:
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?view=diff&rev=470692&r1=470691&r2=470692
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
(original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
Thu Nov  2 22:47:10 2006
@@ -59,39 +59,43 @@
     }
 
     public void resetStatistics() {
-        destination.resetStatistics();
+        destination.getDestinationStatistics().reset();
     }
 
     public long getEnqueueCount() {
-        return destination.getEnqueueCount();
+        return destination.getDestinationStatistics().getEnqueues().getCount();
     }
 
     public long getDequeueCount() {
-        return destination.getDequeueCount();
+        return destination.getDestinationStatistics().getDequeues().getCount();
     }
+    
+    public long getDispatchCount() {
+        return destination.getDestinationStatistics().getDispatched().getCount();
+    }    
 
     public long getConsumerCount() {
-        return destination.getConsumerCount();
+        return destination.getDestinationStatistics().getConsumers().getCount();
     }
 
     public long getQueueSize() {
-        return destination.getQueueSize();
+        return destination.getDestinationStatistics().getMessages().getCount();
     }
 
     public long getMessagesCached() {
-        return destination.getMessagesCached();
+        return destination.getDestinationStatistics().getMessagesCached().getCount();
     }
 
     public int getMemoryPercentageUsed() {
-        return destination.getMemoryPercentageUsed();
+        return destination.getUsageManager().getPercentUsage();
     }
 
     public long getMemoryLimit() {
-        return destination.getMemoryLimit();
+        return destination.getUsageManager().getLimit();
     }
 
     public void setMemoryLimit(long limit) {
-        destination.setMemoryLimit(limit);
+        destination.getUsageManager().setLimit(limit);
     }
 
     public CompositeData[] browse() throws OpenDataException{

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?view=diff&rev=470692&r1=470691&r2=470692
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
(original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
Thu Nov  2 22:47:10 2006
@@ -42,7 +42,12 @@
     public long getEnqueueCount();
 
     /**
-     * @return The number of messages that have been received from the destination.
+     * @return The number of messages that have been delivered (potentially not acknowledged)
to consumers.
+     */
+    public long getDispatchCount();
+
+    /**
+     * @return The number of messages that have been acknowledged from the destination.
      */
     public long getDequeueCount();
 

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?view=diff&rev=470692&r1=470691&r2=470692
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Thu Nov  2 22:47:10 2006
@@ -53,17 +53,7 @@
     DestinationStatistics getDestinationStatistics();
     MessageStore getMessageStore();
     DeadLetterStrategy getDeadLetterStrategy();
-    
+
     public Message[] browse();
-    
-    public void resetStatistics();
     public String getName();
-    public long getEnqueueCount();
-    public long getDequeueCount();
-    public long getConsumerCount();
-    public long getQueueSize();
-    public long getMessagesCached();
-    public int getMemoryPercentageUsed();
-    public long getMemoryLimit();
-    public void setMemoryLimit(long limit);
 }

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java?view=diff&rev=470692&r1=470691&r2=470692
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
(original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
Thu Nov  2 22:47:10 2006
@@ -35,16 +35,19 @@
     protected CountStatisticImpl consumers;
     protected CountStatisticImpl messages;
     protected PollCountStatisticImpl messagesCached;
+    protected CountStatisticImpl dispatched;
 
     public DestinationStatistics() {
 
         enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been
sent to the destination");
-        dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been
dispatched from the destination");
+        dispatched = new CountStatisticImpl("dispatched", "The number of messages that have
been dispatched from the destination");
+        dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been
acknowledged from the destination");
         consumers = new CountStatisticImpl("consumers", "The number of consumers that that
are subscribing to messages from the destination");
         messages = new CountStatisticImpl("messages", "The number of messages that that are
being held by the destination");
         messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages
that are held in the destination's memory cache");
 
         addStatistic("enqueues", enqueues);
+        addStatistic("dispatched", dispatched);
         addStatistic("dequeues", dequeues);
         addStatistic("consumers", consumers);
         addStatistic("messages", messages);
@@ -75,11 +78,13 @@
         super.reset();
         enqueues.reset();
         dequeues.reset();
+        dispatched.reset();
     }
 
     public void setParent(DestinationStatistics parent) {
         if (parent != null) {
             enqueues.setParent(parent.enqueues);
+            dispatched.setParent(parent.dispatched);
             dequeues.setParent(parent.dequeues);
             consumers.setParent(parent.consumers);
             messagesCached.setParent(parent.messagesCached);
@@ -87,6 +92,7 @@
         }
         else {
             enqueues.setParent(null);
+            dispatched.setParent(null);
             dequeues.setParent(null);
             consumers.setParent(null);
             messagesCached.setParent(null);
@@ -98,15 +104,7 @@
         this.messagesCached = messagesCached;
     }
 
-    /**
-     * Called when a message is enqueued to update the statistics.
-     */
-    public void onMessageEnqueue(Message message) {
-        getEnqueues().increment();
-        getMessages().increment();
-    }
-
-    public void onMessageDequeue(Message message) {
-        getDequeues().increment();
+    public CountStatisticImpl getDispatched() {
+		return dispatched;
     }
 }

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=470692&r1=470691&r2=470692
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Thu Nov  2 22:47:10 2006
@@ -108,6 +108,7 @@
                     // Don't remove the nodes until we are committed.
                     if(!context.isInTransaction()){
                     	dequeueCounter++;
+                    	node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
                         iter.remove();
                     }else{
                         // setup a Synchronization to remove nodes from the dispatched list.
@@ -116,6 +117,7 @@
                                 synchronized(PrefetchSubscription.this){
                                 	dequeueCounter++;
                                     dispatched.remove(node);
+                                    node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
                                     prefetchExtension--;
                                 }
                             }
@@ -165,6 +167,7 @@
                 }
                 if(inAckRange){
                     sendToDLQ(context, node);
+                    node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
                     iter.remove();
                     dequeueCounter++;
                     index++;
@@ -319,7 +322,7 @@
 
     synchronized protected void onDispatch(final MessageReference node,final Message message){
         if(node.getRegionDestination()!=null){
-            node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
+            node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
             context.getConnection().getStatistics().onMessageDequeue(message);
             try{
                 dispatchMatched();

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=470692&r1=470691&r2=470692
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Nov  2 22:47:10 2006
@@ -392,47 +392,10 @@
     public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) {
         this.messageGroupHashBucketCount = messageGroupHashBucketCount;
     }
-    
-    public void resetStatistics() {
-        getDestinationStatistics().reset();
-    }
 
     public String getName() {
         return getActiveMQDestination().getPhysicalName();
     }
-
-    public long getEnqueueCount() {
-        return getDestinationStatistics().getEnqueues().getCount();
-    }
-
-    public long getDequeueCount() {
-        return getDestinationStatistics().getDequeues().getCount();
-    }
-
-    public long getConsumerCount() {
-        return getDestinationStatistics().getConsumers().getCount();
-    }
-
-    public long getQueueSize() {
-        return getDestinationStatistics().getMessages().getCount();
-    }
-
-    public long getMessagesCached() {
-        return getDestinationStatistics().getMessagesCached().getCount();
-    }
-
-    public int getMemoryPercentageUsed() {
-        return getUsageManager().getPercentUsage();
-    }
-
-    public long getMemoryLimit() {
-        return getUsageManager().getLimit();
-    }
-
-    public void setMemoryLimit(long limit) {
-        getUsageManager().setLimit(limit);
-    }
-    
     
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -445,7 +408,8 @@
         dispatchValve.increment();
         MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
         try {
-            destinationStatistics.onMessageEnqueue(message);
+            destinationStatistics.getEnqueues().increment();
+	    destinationStatistics.getMessages().increment();
             synchronized (messages) {
                 messages.add(node);
             }

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=470692&r1=470691&r2=470692
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Thu Nov  2 22:47:10 2006
@@ -380,46 +380,10 @@
         this.deadLetterStrategy = deadLetterStrategy;
     }
 
-    public void resetStatistics(){
-        getDestinationStatistics().reset();
-    }
-
     public String getName() {
         return getActiveMQDestination().getPhysicalName();
     }
 
-    public long getEnqueueCount() {
-        return getDestinationStatistics().getEnqueues().getCount();
-    }
-
-    public long getDequeueCount() {
-        return getDestinationStatistics().getDequeues().getCount();
-    }
-
-    public long getConsumerCount() {
-        return getDestinationStatistics().getConsumers().getCount();
-    }
-
-    public long getQueueSize() {
-        return getDestinationStatistics().getMessages().getCount();
-    }
-
-    public long getMessagesCached() {
-        return getDestinationStatistics().getMessagesCached().getCount();
-    }
-
-    public int getMemoryPercentageUsed() {
-        return getUsageManager().getPercentUsage();
-    }
-
-    public long getMemoryLimit() {
-        return getUsageManager().getLimit();
-    }
-
-    public void setMemoryLimit(long limit) {
-        getUsageManager().setLimit(limit);
-    }
-    
     
     // Implementation methods
     // -------------------------------------------------------------------------

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=470692&r1=470691&r2=470692
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Thu Nov  2 22:47:10 2006
@@ -60,6 +60,9 @@
     private final AtomicLong enqueueCounter = new AtomicLong(0);
     private final AtomicLong dequeueCounter = new AtomicLong(0);
     
+    boolean singleDestination=true;
+    Destination destination;    
+    
     public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager
usageManager)
                     throws InvalidSelectorException{
         super(broker,context,info);
@@ -146,12 +149,22 @@
                 delivered.addAndGet(ack.getMessageCount());
                 context.getTransaction().addSynchronization(new Synchronization(){
                     public void afterCommit() throws Exception{
+                    	synchronized( TopicSubscription.this ) {
+	                    	if( singleDestination ) {
+	                    		destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
+	                    	}
+                    	}                    
                         dequeueCounter.addAndGet(ack.getMessageCount());
                         dispatched.addAndGet(-ack.getMessageCount());
                         delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
                     }
                 });
             }else{
+            	
+            	if( singleDestination ) {
+            		destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
+            	}
+            	            
                 dequeueCounter.addAndGet(ack.getMessageCount());
                 dispatched.addAndGet(-ack.getMessageCount());
                 delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
@@ -308,15 +321,29 @@
         md.setConsumerId(info.getConsumerId());
         md.setDestination(node.getRegionDestination().getActiveMQDestination());
         dispatched.incrementAndGet();
+        
+        // Keep track if this subscription is receiving messages from a single destination.
+        if( singleDestination ) {
+        	if( destination == null ) {
+        		destination = node.getRegionDestination();
+        	} else {
+        		if( destination != node.getRegionDestination() ) {
+        			singleDestination = false;
+        		}
+        	}
+        }
+                
         if(info.isDispatchAsync()){
             md.setConsumer(new Runnable(){
                 public void run(){
+                    node.getRegionDestination().getDestinationStatistics().getDispatched().increment();

                     node.decrementReferenceCount();
                 }
             });
             context.getConnection().dispatchAsync(md);
         }else{
             context.getConnection().dispatchSync(md);
+            node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
             node.decrementReferenceCount();
         }
     }



Mime
View raw message