activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r514694 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: ./ jmx/ region/
Date Mon, 05 Mar 2007 15:41:19 GMT
Author: jlim
Date: Mon Mar  5 07:41:17 2007
New Revision: 514694

URL: http://svn.apache.org/viewvc?view=rev&rev=514694
Log:
ported fix to trunk : 
http://issues.apache.org/activemq/browse/AMQ-1172
http://issues.apache.org/activemq/browse/AMQ-1174
http://issues.apache.org/activemq/browse/AMQ-1175

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java?view=diff&rev=514694&r1=514693&r2=514694
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
Mon Mar  5 07:41:17 2007
@@ -105,5 +105,7 @@
 	public String getRemoteAddress();
 
 	public void serviceExceptionAsync(IOException e);
+	
+	public String getConnectionId();
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=514694&r1=514693&r2=514694
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Mon Mar  5 07:41:17 2007
@@ -26,6 +26,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.broker.region.ConnectionStatistics;
@@ -97,7 +98,7 @@
     // Used to do async dispatch.. this should perhaps be pushed down into the transport
layer..
     protected final List dispatchQueue=Collections.synchronizedList(new LinkedList());
     protected final TaskRunner taskRunner;
-    protected IOException transportException;
+    protected final AtomicReference transportException = new AtomicReference();
     private boolean inServiceException=false;
     private ConnectionStatistics statistics=new ConnectionStatistics();
     private boolean manageable;
@@ -116,6 +117,8 @@
     private final AtomicBoolean asyncException=new AtomicBoolean(false);
     private final Map<ProducerId,ProducerBrokerExchange>producerExchanges = new HashMap<ProducerId,ProducerBrokerExchange>();
     private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges = new HashMap<ConsumerId,ConsumerBrokerExchange>();
+    private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
+    protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
 
     static class ConnectionState extends org.apache.activemq.state.ConnectionState{
 
@@ -166,7 +169,7 @@
                 Command command=(Command)o;
                 Response response=service(command);
                 if(response!=null){
-                    dispatch(response);
+                	dispatchSync(response);
                 }
             }
 
@@ -186,7 +189,7 @@
 
     public void serviceTransportException(IOException e){
         if(!disposed.get()){
-            transportException=e;
+        	transportException.set(e);
             if(transportLog.isDebugEnabled())
                 transportLog.debug("Transport failed: "+e,e);
             ServiceSupport.dispose(this);
@@ -683,47 +686,96 @@
     }
 
     public void dispatchSync(Command message){
-        processDispatch(message);
+        getStatistics().getEnqueues().increment();
+        try {
+            processDispatch(message);
+        } catch (IOException e) {
+            serviceExceptionAsync(e);
+        }
     }
 
     public void dispatchAsync(Command message){
-        if(taskRunner==null){
-            dispatchSync(message);
-        }else{
-            dispatchQueue.add(message);
-            try{
-                taskRunner.wakeup();
-            }catch(InterruptedException e){
-                Thread.currentThread().interrupt();
+        if( !disposed.get() ) {
+            getStatistics().getEnqueues().increment();
+            if( taskRunner==null ) {
+                dispatchSync( message );
+            } else {
+                dispatchQueue.add(message);
+                try {
+                    taskRunner.wakeup();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
             }
+        } else {
+            if(message.isMessageDispatch()) {
+                MessageDispatch md=(MessageDispatch) message;
+                Runnable sub=(Runnable) md.getConsumer();
+                broker.processDispatch(md);
+                if(sub!=null){
+                    sub.run();
+                }
+             }
         }
     }
 
-    protected void processDispatch(Command command){
-        if(command.isMessageDispatch()){
-            MessageDispatch md=(MessageDispatch)command;
-            Runnable sub=(Runnable)md.getConsumer();
-            broker.processDispatch(md);
-            try{
-                dispatch(command);
-            }finally{
+    protected void processDispatch(Command command) throws IOException {
+        try {
+            if( !disposed.get() ) {
+                 dispatch(command);
+            }
+       } finally {
+
+            if(command.isMessageDispatch()){
+                MessageDispatch md=(MessageDispatch) command;
+                Runnable sub=(Runnable) md.getConsumer();
+                broker.processDispatch(md);
                 if(sub!=null){
                     sub.run();
                 }
             }
-        }else{
-            dispatch(command);
+
+            getStatistics().getDequeues().increment();
         }
-    }
+     }   
+
+
 
     public boolean iterate(){
-        if(dispatchQueue.isEmpty()||broker.isStopped()){
-            return false;
-        }else{
-            Command command=(Command)dispatchQueue.remove(0);
-            processDispatch(command);
-            return true;
-        }
+        try {
+            if( disposed.get() ) {
+                 if( dispatchStopped.compareAndSet(false, true)) {                      
                                      
+                     if( transportException.get()==null ) {
+                         try {
+                             dispatch(new ShutdownInfo());
+                         } catch (Throwable ignore) {
+                         }
+                     }
+                     dispatchStoppedLatch.countDown();
+                 }
+                 return false;                           
+             } 
+
+             if( !dispatchStopped.get() )  {
+
+                 if( dispatchQueue.isEmpty() ) {
+                     return false;
+                 } else {
+                     Command command = (Command) dispatchQueue.remove(0);
+                     processDispatch( command );
+                     return true;
+                 }
+             } else {
+                 return false;
+             }
+
+         } catch (IOException e) {
+             if( dispatchStopped.compareAndSet(false, true)) {                          
                                          
+                 dispatchStoppedLatch.countDown();
+             }
+             serviceExceptionAsync(e);
+             return false;                           
+         }
     }
 
     /**
@@ -792,11 +844,24 @@
             transport.stop();
             active=false;
             if(disposed.compareAndSet(false,true)){
-                if(taskRunner!=null)
-                    taskRunner.shutdown();
-                // Clear out the dispatch queue to release any memory that
-                // is being held on to.
-                dispatchQueue.clear();
+                taskRunner.wakeup();
+                dispatchStoppedLatch.await();
+
+		        if( taskRunner!=null )
+		            taskRunner.shutdown();
+		        
+                // Run the MessageDispatch callbacks so that message references get cleaned
up.
+                for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) {
+                    Command command = (Command) iter.next();
+                    if(command.isMessageDispatch()) {
+                        MessageDispatch md=(MessageDispatch) command;
+                        Runnable sub=(Runnable) md.getConsumer();
+                        broker.processDispatch(md);
+                        if(sub!=null){
+                            sub.run();
+                        }
+                    }
+                } 
                 //
                 // Remove all logical connection associated with this connection
                 // from the broker.
@@ -965,13 +1030,10 @@
         return null;
     }
 
-    protected void dispatch(Command command){
+    protected void dispatch(Command command) throws IOException{
         try{
             setMarkedCandidate(true);
             transport.oneway(command);
-            getStatistics().onCommand(command);
-        }catch(IOException e){
-            serviceExceptionAsync(e);
         }finally{
             setMarkedCandidate(false);
         }
@@ -980,6 +1042,17 @@
     public String getRemoteAddress(){
         return transport.getRemoteAddress();
     }
+    
+    public String getConnectionId() {
+        Iterator iterator = localConnectionStates.values().iterator();
+        ConnectionState object = (ConnectionState) iterator.next();
+        if( object == null ) {
+            return null;
+        }
+        if( object.getInfo().getClientId() !=null )
+            return object.getInfo().getClientId();
+        return object.getInfo().getConnectionId().toString();
+    }    
     
     private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){
         ProducerBrokerExchange result=producerExchanges.get(id);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java?view=diff&rev=514694&r1=514693&r2=514694
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
Mon Mar  5 07:41:17 2007
@@ -102,4 +102,8 @@
 		return connection.getRemoteAddress();
 	}
 
+	public String getConnectionId() {
+		return connection.getConnectionId();
+	}	
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java?view=diff&rev=514694&r1=514693&r2=514694
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java
Mon Mar  5 07:41:17 2007
@@ -18,8 +18,7 @@
 
 package org.apache.activemq.broker.region;
 
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.Message;
+
 import org.apache.activemq.management.CountStatisticImpl;
 import org.apache.activemq.management.StatsImpl;
 
@@ -73,16 +72,5 @@
         }
     }
 
-    /**
-     * Updates the statistics as a command is dispatched into the connection
-     */
-    public void onCommand(Command command) {
-        if (command.isMessageDispatch()) {
-            enqueues.increment();
-        }
-    }
 
-    public void onMessageDequeue(Message message) {
-        dequeues.increment();
-    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=514694&r1=514693&r2=514694
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Mar  5 07:41:17 2007
@@ -453,7 +453,6 @@
         if(node.getRegionDestination()!=null){
             if(node!=QueueMessageReference.NULL_MESSAGE){
                 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
-                context.getConnection().getStatistics().onMessageDequeue(message);
             }
             try{
                 dispatchMatched();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=514694&r1=514693&r2=514694
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Mon Mar  5 07:41:17 2007
@@ -44,8 +44,8 @@
     private static final AtomicLong cursorNameCounter=new AtomicLong(0);
     protected PendingMessageCursor matched;
     final protected UsageManager usageManager;
-    protected AtomicLong dispatched=new AtomicLong();
-    protected AtomicLong delivered=new AtomicLong();
+    protected AtomicLong dispatchedCounter=new AtomicLong();
+    protected AtomicLong prefetchExtension=new AtomicLong();
     private int maximumPendingMessages=-1;
     private MessageEvictionStrategy messageEvictionStrategy=new OldestMessageEvictionStrategy();
     private int discarded=0;
@@ -136,7 +136,7 @@
                 MessageReference node=matched.next();
                 if(node.isExpired()){
                     matched.remove();
-                    dispatched.incrementAndGet();
+                    dispatchedCounter.incrementAndGet();
                     node.decrementReferenceCount();
                     break;
                 }
@@ -154,7 +154,7 @@
                     MessageReference node=matched.next();
                     if(node.getMessageId().equals(mdn.getMessageId())){
                         matched.remove();
-                        dispatched.incrementAndGet();
+                        dispatchedCounter.incrementAndGet();
                         node.decrementReferenceCount();
                         break;
                     }
@@ -170,7 +170,7 @@
         boolean wasFull=isFull();
         if(ack.isStandardAck()||ack.isPoisonAck()){
             if(context.isInTransaction()){
-                delivered.addAndGet(ack.getMessageCount());
+            	prefetchExtension.addAndGet(ack.getMessageCount());
                 context.getTransaction().addSynchronization(new Synchronization(){
 
                     public void afterCommit() throws Exception{
@@ -180,8 +180,7 @@
                             }
                         }
                         dequeueCounter.addAndGet(ack.getMessageCount());
-                        dispatched.addAndGet(-ack.getMessageCount());
-                        delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
+                        prefetchExtension.addAndGet(ack.getMessageCount());
                     }
                 });
             }else{
@@ -189,8 +188,7 @@
                     destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
                 }
                 dequeueCounter.addAndGet(ack.getMessageCount());
-                dispatched.addAndGet(-ack.getMessageCount());
-                delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
+                prefetchExtension.addAndGet(ack.getMessageCount());
             }
             if(wasFull&&!isFull()){
                 dispatchMatched();
@@ -198,7 +196,7 @@
             return;
         }else if(ack.isDeliveredAck()){
             // Message was delivered but not acknowledged: update pre-fetch counters.
-            delivered.addAndGet(ack.getMessageCount());
+        	prefetchExtension.addAndGet(ack.getMessageCount());
             if(wasFull&&!isFull()){
                 dispatchMatched();
             }
@@ -217,7 +215,7 @@
     }
 
     public int getDispatchedQueueSize(){
-        return (int)(dispatched.get()-delivered.get());
+    	return (int)(dispatchedCounter.get()-dequeueCounter.get());
     }
 
     public int getMaximumPendingMessages(){
@@ -225,7 +223,7 @@
     }
 
     public long getDispatchedCounter(){
-        return dispatched.get();
+    	return dispatchedCounter.get();
     }
 
     public long getEnqueueCounter(){
@@ -277,21 +275,21 @@
     // Implementation methods
     // -------------------------------------------------------------------------
     private boolean isFull(){
-        return dispatched.get()-delivered.get()>=info.getPrefetchSize();
+    	return getDispatchedQueueSize()-prefetchExtension.get()>=info.getPrefetchSize();
     }
 
     /**
      * @return true when 60% or more room is left for dispatching messages
      */
     public boolean isLowWaterMark(){
-        return (dispatched.get()-delivered.get())<=(info.getPrefetchSize()*.4);
+    	return (getDispatchedQueueSize()-prefetchExtension.get()) <= (info.getPrefetchSize()
*.4);
     }
 
     /**
      * @return true when 10% or less room is left for dispatching messages
      */
     public boolean isHighWaterMark(){
-        return (dispatched.get()-delivered.get())>=(info.getPrefetchSize()*.9);
+    	return (getDispatchedQueueSize()-prefetchExtension.get()) >= (info.getPrefetchSize()
*.9);
     }
 
     /**
@@ -386,7 +384,7 @@
         md.setMessage(message);
         md.setConsumerId(info.getConsumerId());
         md.setDestination(node.getRegionDestination().getActiveMQDestination());
-        dispatched.incrementAndGet();
+        dispatchedCounter.incrementAndGet();
         // Keep track if this subscription is receiving messages from a single destination.
         if(singleDestination){
             if(destination==null){
@@ -429,6 +427,8 @@
         }
     }
 
-    
+    public int getPrefetchSize() {
+        return (int) (info.getPrefetchSize() + prefetchExtension.get());
+    }    
     
 }



Mime
View raw message