Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 21336 invoked from network); 23 Feb 2007 20:24:34 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 23 Feb 2007 20:24:34 -0000 Received: (qmail 8571 invoked by uid 500); 23 Feb 2007 20:24:38 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 8520 invoked by uid 500); 23 Feb 2007 20:24:38 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 8374 invoked by uid 99); 23 Feb 2007 20:24:37 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Feb 2007 12:24:37 -0800 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Feb 2007 12:24:28 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id ECE111A9820; Fri, 23 Feb 2007 12:24:07 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r511084 - in /activemq/branches/activemq-4.1: ./ activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/broker/jmx/ activemq-core/src/main/java/org/apache/activemq/broker/region/ Date: Fri, 23 Feb 2007 20:24:07 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070223202407.ECE111A9820@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Fri Feb 23 12:24:06 2007 New Revision: 511084 URL: http://svn.apache.org/viewvc?view=rev&rev=511084 Log: r240@34: chirino | 2007-02-23 14:48:56 -0500 Enhanced the JMX stats so that the enqueue and dequeue attributes on the connection object actually reflect what's been enqueued and dequeued on it. Also fixed stats on Topics so they make sense. Modified: activemq/branches/activemq-4.1/ (props changed) activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Propchange: activemq/branches/activemq-4.1/ ------------------------------------------------------------------------------ --- svk:merge (original) +++ svk:merge Fri Feb 23 12:24:06 2007 @@ -1 +1 @@ -635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:239 +635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:240 Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java?view=diff&rev=511084&r1=511083&r2=511084 ============================================================================== --- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java (original) +++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java Fri Feb 23 12:24:06 2007 @@ -106,4 +106,6 @@ public void serviceExceptionAsync(IOException e); + public String getConnectionId(); + } Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=511084&r1=511083&r2=511084 ============================================================================== --- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original) +++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Feb 23 12:24:06 2007 @@ -765,11 +765,13 @@ } public void dispatchSync(Command message) { + getStatistics().getEnqueues().increment(); processDispatch(message); } public void dispatchAsync(Command message) { + getStatistics().getEnqueues().increment(); if( taskRunner==null ) { dispatchSync( message ); } else { @@ -783,22 +785,26 @@ } protected void processDispatch(Command command){ - if(command.isMessageDispatch()){ - MessageDispatch md=(MessageDispatch) command; - Runnable sub=(Runnable) md.getConsumer(); - broker.processDispatch(md); - try{ - dispatch(command); - }finally{ - if(sub!=null){ - sub.run(); + try { + if(command.isMessageDispatch()){ + MessageDispatch md=(MessageDispatch) command; + Runnable sub=(Runnable) md.getConsumer(); + broker.processDispatch(md); + try{ + dispatch(command); + }finally{ + if(sub!=null){ + sub.run(); + } } + } else if( command.isShutdownInfo() ) { + dispatch(command); + dispatchStopped.countDown(); + } else { + dispatch(command); } - } else if( command.isShutdownInfo() ) { - dispatch(command); - dispatchStopped.countDown(); - } else { - dispatch(command); + } finally { + getStatistics().getDequeues().increment(); } } @@ -1077,12 +1083,9 @@ try { setMarkedCandidate(true); transport.oneway(command); - getStatistics().onCommand(command); - } - catch (IOException e) { + } catch(IOException e){ serviceExceptionAsync(e); - } - finally { + } finally{ setMarkedCandidate(false); } } @@ -1090,4 +1093,16 @@ public String getRemoteAddress() { return transport.getRemoteAddress(); } + + public String getConnectionId() { + Iterator iterator = localConnectionStates.values().iterator(); + ConnectionState object = (ConnectionState) iterator.next(); + if( object == null ) { + return null; + } + if( object.getInfo().getClientId() !=null ) + return object.getInfo().getClientId(); + return object.getInfo().getConnectionId().toString(); + } + } Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java?view=diff&rev=511084&r1=511083&r2=511084 ============================================================================== --- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java (original) +++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java Fri Feb 23 12:24:06 2007 @@ -102,4 +102,8 @@ return connection.getRemoteAddress(); } + public String getConnectionId() { + return connection.getConnectionId(); + } + } Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java?view=diff&rev=511084&r1=511083&r2=511084 ============================================================================== --- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java (original) +++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java Fri Feb 23 12:24:06 2007 @@ -18,8 +18,6 @@ package org.apache.activemq.broker.region; -import org.apache.activemq.command.Command; -import org.apache.activemq.command.Message; import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.management.StatsImpl; @@ -67,16 +65,4 @@ } } - /** - * Updates the statistics as a command is dispatched into the connection - */ - public void onCommand(Command command) { - if (command.isMessageDispatch()) { - enqueues.increment(); - } - } - - public void onMessageDequeue(Message message) { - dequeues.increment(); - } } Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=511084&r1=511083&r2=511084 ============================================================================== --- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Feb 23 12:24:06 2007 @@ -414,7 +414,6 @@ if(node.getRegionDestination()!=null){ if( node != QueueMessageReference.NULL_MESSAGE ) { node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); - context.getConnection().getStatistics().onMessageDequeue(message); } try{ dispatchMatched(); Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=511084&r1=511083&r2=511084 ============================================================================== --- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original) +++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri Feb 23 12:24:06 2007 @@ -53,8 +53,8 @@ final protected LinkedList matched=new LinkedList(); final protected ActiveMQDestination dlqDestination=new ActiveMQQueue("ActiveMQ.DLQ"); final protected UsageManager usageManager; - protected AtomicLong dispatched=new AtomicLong(); - protected AtomicLong delivered=new AtomicLong(); + protected AtomicLong dispatchedCounter=new AtomicLong(); + protected AtomicLong prefetchExtension=new AtomicLong(); private int maximumPendingMessages=-1; private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); private int discarded = 0; @@ -131,7 +131,7 @@ MessageReference node=(MessageReference) i.next(); if (node.isExpired()) { i.remove(); - dispatched.incrementAndGet(); + dispatchedCounter.incrementAndGet(); node.decrementReferenceCount(); break; } @@ -144,7 +144,7 @@ MessageReference node=(MessageReference) i.next(); if(node.getMessageId().equals(mdn.getMessageId())){ i.remove(); - dispatched.incrementAndGet(); + dispatchedCounter.incrementAndGet(); node.decrementReferenceCount(); break; } @@ -158,7 +158,7 @@ boolean wasFull=isFull(); if(ack.isStandardAck()||ack.isPoisonAck()){ if(context.isInTransaction()){ - delivered.addAndGet(ack.getMessageCount()); + prefetchExtension.addAndGet(ack.getMessageCount()); context.getTransaction().addSynchronization(new Synchronization(){ public void afterCommit() throws Exception{ synchronized( TopicSubscription.this ) { @@ -167,8 +167,7 @@ } } dequeueCounter.addAndGet(ack.getMessageCount()); - dispatched.addAndGet(-ack.getMessageCount()); - delivered.set(Math.max(0,delivered.get()-ack.getMessageCount())); + prefetchExtension.set(Math.max(0,prefetchExtension.get()-ack.getMessageCount())); } }); }else{ @@ -178,8 +177,7 @@ } dequeueCounter.addAndGet(ack.getMessageCount()); - dispatched.addAndGet(-ack.getMessageCount()); - delivered.set(Math.max(0,delivered.get()-ack.getMessageCount())); + prefetchExtension.set(Math.max(0,prefetchExtension.get()-ack.getMessageCount())); } if(wasFull&&!isFull()){ dispatchMatched(); @@ -187,7 +185,7 @@ return; }else if(ack.isDeliveredAck()){ // Message was delivered but not acknowledged: update pre-fetch counters. - delivered.addAndGet(ack.getMessageCount()); + prefetchExtension.addAndGet(ack.getMessageCount()); if(wasFull&&!isFull()){ dispatchMatched(); } @@ -206,7 +204,7 @@ } public int getDispatchedQueueSize(){ - return (int)(dispatched.get()-delivered.get()); + return (int)(dispatchedCounter.get()-dequeueCounter.get()); } public int getMaximumPendingMessages(){ @@ -214,7 +212,7 @@ } public long getDispatchedCounter() { - return dispatched.get(); + return dispatchedCounter.get(); } public long getEnqueueCounter() { @@ -270,21 +268,21 @@ // ------------------------------------------------------------------------- private boolean isFull(){ - return dispatched.get()-delivered.get()>=info.getPrefetchSize(); + return getDispatchedQueueSize()-prefetchExtension.get()>=info.getPrefetchSize(); } /** * @return true when 60% or more room is left for dispatching messages */ public boolean isLowWaterMark(){ - return (dispatched.get()-delivered.get()) <= (info.getPrefetchSize() *.4); + return (getDispatchedQueueSize()-prefetchExtension.get()) <= (info.getPrefetchSize() *.4); } /** * @return true when 10% or less room is left for dispatching messages */ public boolean isHighWaterMark(){ - return (dispatched.get()-delivered.get()) >= (info.getPrefetchSize() *.9); + return (getDispatchedQueueSize()-prefetchExtension.get()) >= (info.getPrefetchSize() *.9); } /** @@ -337,7 +335,7 @@ md.setMessage(message); md.setConsumerId(info.getConsumerId()); md.setDestination(node.getRegionDestination().getActiveMQDestination()); - dispatched.incrementAndGet(); + dispatchedCounter.incrementAndGet(); // Keep track if this subscription is receiving messages from a single destination. if( singleDestination ) { @@ -378,6 +376,10 @@ } matched.clear(); } + } + + public int getPrefetchSize() { + return (int) (info.getPrefetchSize() + prefetchExtension.get()); } }