activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r511084 - in /activemq/branches/activemq-4.1: ./ activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/broker/jmx/ activemq-core/src/main/java/org/apache/activemq/broker/region/
Date Fri, 23 Feb 2007 20:24:07 GMT
Author: chirino
Date: Fri Feb 23 12:24:06 2007
New Revision: 511084

URL: http://svn.apache.org/viewvc?view=rev&rev=511084
Log:
 r240@34:  chirino | 2007-02-23 14:48:56 -0500
 Enhanced the JMX stats so that the enqueue and dequeue attributes on the connection object
actually reflect what's been enqueued and dequeued on it.  Also fixed stats on Topics so they
make sense.  
 

Modified:
    activemq/branches/activemq-4.1/   (props changed)
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java

Propchange: activemq/branches/activemq-4.1/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Fri Feb 23 12:24:06 2007
@@ -1 +1 @@
-635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:239
+635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:240

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java?view=diff&rev=511084&r1=511083&r2=511084
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
Fri Feb 23 12:24:06 2007
@@ -106,4 +106,6 @@
 
 	public void serviceExceptionAsync(IOException e);
 
+	public String getConnectionId();
+
 }

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=511084&r1=511083&r2=511084
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Feb 23 12:24:06 2007
@@ -765,11 +765,13 @@
     }
 
     public void dispatchSync(Command message) {
+        getStatistics().getEnqueues().increment();
         processDispatch(message);
     }
     
     
     public void dispatchAsync(Command message) {
+        getStatistics().getEnqueues().increment();
         if( taskRunner==null ) {
             dispatchSync( message );
         } else {
@@ -783,22 +785,26 @@
     }
     
     protected void processDispatch(Command command){
-        if(command.isMessageDispatch()){
-            MessageDispatch md=(MessageDispatch) command;
-            Runnable sub=(Runnable) md.getConsumer();
-            broker.processDispatch(md);
-            try{
-                dispatch(command);
-            }finally{
-                if(sub!=null){
-                    sub.run();
+        try {
+            if(command.isMessageDispatch()){
+                MessageDispatch md=(MessageDispatch) command;
+                Runnable sub=(Runnable) md.getConsumer();
+                broker.processDispatch(md);
+                try{
+                    dispatch(command);
+                }finally{
+                    if(sub!=null){
+                        sub.run();
+                    }
                 }
+            } else if( command.isShutdownInfo() ) {
+                dispatch(command);
+                dispatchStopped.countDown();
+            } else {
+                dispatch(command);
             }
-        } else if( command.isShutdownInfo() ) {
-            dispatch(command);
-            dispatchStopped.countDown();
-        } else {
-            dispatch(command);
+        } finally {
+            getStatistics().getDequeues().increment();
         }
     }       
     
@@ -1077,12 +1083,9 @@
         try {
             setMarkedCandidate(true);
             transport.oneway(command);
-            getStatistics().onCommand(command);
-        }
-        catch (IOException e) {
+        } catch(IOException e){
             serviceExceptionAsync(e);
-        }
-        finally {
+        } finally{
             setMarkedCandidate(false);
         }
     }
@@ -1090,4 +1093,16 @@
     public String getRemoteAddress() {
         return transport.getRemoteAddress();
     }
+
+    public String getConnectionId() {
+        Iterator iterator = localConnectionStates.values().iterator();
+        ConnectionState object = (ConnectionState) iterator.next();
+        if( object == null ) {
+            return null;
+        }
+        if( object.getInfo().getClientId() !=null )
+            return object.getInfo().getClientId();
+        return object.getInfo().getConnectionId().toString();
+    }
+
 }

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java?view=diff&rev=511084&r1=511083&r2=511084
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
Fri Feb 23 12:24:06 2007
@@ -102,4 +102,8 @@
 		return connection.getRemoteAddress();
 	}
 
+	public String getConnectionId() {
+		return connection.getConnectionId();
+	}
+
 }

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java?view=diff&rev=511084&r1=511083&r2=511084
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java
Fri Feb 23 12:24:06 2007
@@ -18,8 +18,6 @@
 
 package org.apache.activemq.broker.region;
 
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.Message;
 import org.apache.activemq.management.CountStatisticImpl;
 import org.apache.activemq.management.StatsImpl;
 
@@ -67,16 +65,4 @@
         }
     }
 
-    /**
-     * Updates the statistics as a command is dispatched into the connection
-     */
-    public void onCommand(Command command) {
-        if (command.isMessageDispatch()) {
-            enqueues.increment();
-        }
-    }
-
-    public void onMessageDequeue(Message message) {
-        dequeues.increment();
-    }
 }

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=511084&r1=511083&r2=511084
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Fri Feb 23 12:24:06 2007
@@ -414,7 +414,6 @@
         if(node.getRegionDestination()!=null){
         	if( node != QueueMessageReference.NULL_MESSAGE ) {
             node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
-            context.getConnection().getStatistics().onMessageDequeue(message);
         	}
             try{
                 dispatchMatched();

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=511084&r1=511083&r2=511084
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Fri Feb 23 12:24:06 2007
@@ -53,8 +53,8 @@
     final protected LinkedList matched=new LinkedList();
     final protected ActiveMQDestination dlqDestination=new ActiveMQQueue("ActiveMQ.DLQ");
     final protected UsageManager usageManager;
-    protected AtomicLong dispatched=new AtomicLong();
-    protected AtomicLong delivered=new AtomicLong();
+    protected AtomicLong dispatchedCounter=new AtomicLong();
+    protected AtomicLong prefetchExtension=new AtomicLong();
     private int maximumPendingMessages=-1;
     private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
     private int discarded = 0;
@@ -131,7 +131,7 @@
             MessageReference node=(MessageReference) i.next();
             if (node.isExpired()) {
                 i.remove();
-                dispatched.incrementAndGet();
+                dispatchedCounter.incrementAndGet();
                 node.decrementReferenceCount();
                 break;
             }
@@ -144,7 +144,7 @@
                 MessageReference node=(MessageReference) i.next();
                 if(node.getMessageId().equals(mdn.getMessageId())){
                     i.remove();
-                    dispatched.incrementAndGet();
+                    dispatchedCounter.incrementAndGet();
                     node.decrementReferenceCount();
                     break;
                 }
@@ -158,7 +158,7 @@
         boolean wasFull=isFull();
         if(ack.isStandardAck()||ack.isPoisonAck()){
             if(context.isInTransaction()){
-                delivered.addAndGet(ack.getMessageCount());
+                prefetchExtension.addAndGet(ack.getMessageCount());
                 context.getTransaction().addSynchronization(new Synchronization(){
                     public void afterCommit() throws Exception{
                     	synchronized( TopicSubscription.this ) {
@@ -167,8 +167,7 @@
 	                    	}
                     	}                    
                         dequeueCounter.addAndGet(ack.getMessageCount());
-                        dispatched.addAndGet(-ack.getMessageCount());
-                        delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
+                        prefetchExtension.set(Math.max(0,prefetchExtension.get()-ack.getMessageCount()));
                     }
                 });
             }else{
@@ -178,8 +177,7 @@
             	}
             	            
                 dequeueCounter.addAndGet(ack.getMessageCount());
-                dispatched.addAndGet(-ack.getMessageCount());
-                delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
+                prefetchExtension.set(Math.max(0,prefetchExtension.get()-ack.getMessageCount()));
             }
             if(wasFull&&!isFull()){
                 dispatchMatched();
@@ -187,7 +185,7 @@
             return;
         }else if(ack.isDeliveredAck()){
             // Message was delivered but not acknowledged: update pre-fetch counters.
-            delivered.addAndGet(ack.getMessageCount());
+            prefetchExtension.addAndGet(ack.getMessageCount());
             if(wasFull&&!isFull()){
                 dispatchMatched();
             }
@@ -206,7 +204,7 @@
     }
 
     public int getDispatchedQueueSize(){
-        return (int)(dispatched.get()-delivered.get());
+        return (int)(dispatchedCounter.get()-dequeueCounter.get());
     }
 
     public int getMaximumPendingMessages(){
@@ -214,7 +212,7 @@
     }
     
 	public long getDispatchedCounter() {
-		return dispatched.get();
+		return dispatchedCounter.get();
 	}
 
 	public long getEnqueueCounter() {
@@ -270,21 +268,21 @@
     // -------------------------------------------------------------------------
 
     private boolean isFull(){
-        return dispatched.get()-delivered.get()>=info.getPrefetchSize();
+        return getDispatchedQueueSize()-prefetchExtension.get()>=info.getPrefetchSize();
     }
     
     /**
      * @return true when 60% or more room is left for dispatching messages
      */
     public boolean isLowWaterMark(){
-        return (dispatched.get()-delivered.get()) <= (info.getPrefetchSize() *.4);
+        return (getDispatchedQueueSize()-prefetchExtension.get()) <= (info.getPrefetchSize()
*.4);
     }
     
     /**
      * @return true when 10% or less room is left for dispatching messages
      */
     public boolean isHighWaterMark(){
-        return (dispatched.get()-delivered.get()) >= (info.getPrefetchSize() *.9);
+        return (getDispatchedQueueSize()-prefetchExtension.get()) >= (info.getPrefetchSize()
*.9);
     }
     
     /**
@@ -337,7 +335,7 @@
         md.setMessage(message);
         md.setConsumerId(info.getConsumerId());
         md.setDestination(node.getRegionDestination().getActiveMQDestination());
-        dispatched.incrementAndGet();
+        dispatchedCounter.incrementAndGet();
        
         // Keep track if this subscription is receiving messages from a single destination.
         if( singleDestination ) {
@@ -378,6 +376,10 @@
             }
             matched.clear();
         }
+    }
+
+    public int getPrefetchSize() {
+        return (int) (info.getPrefetchSize() + prefetchExtension.get());
     }
 
 }



Mime
View raw message