activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r637878 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Date Mon, 17 Mar 2008 13:28:10 GMT
Author: rajdavies
Date: Mon Mar 17 06:28:06 2008
New Revision: 637878

URL: http://svn.apache.org/viewvc?rev=637878&view=rev
Log:
tightened synchronization around dispatchQueue

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=637878&r1=637877&r2=637878&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Mon Mar 17 06:28:06 2008
@@ -112,7 +112,7 @@
     protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
     // The broker and wireformat info that was exchanged.
     protected BrokerInfo brokerInfo;
-    protected final List<Command> dispatchQueue = Collections.synchronizedList(new
LinkedList<Command>());
+    protected final List<Command> dispatchQueue = new LinkedList<Command>();
     protected TaskRunner taskRunner;
     protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
     protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
@@ -205,7 +205,9 @@
      * @return size of dispatch queue
      */
     public int getDispatchQueueSize() {
-        return dispatchQueue.size();
+        synchronized(dispatchQueue) {
+            return dispatchQueue.size();
+        }
     }
 
     public void serviceTransportException(IOException e) {
@@ -743,7 +745,9 @@
             if (taskRunner == null) {
                 dispatchSync(message);
             } else {
-                dispatchQueue.add(message);
+                synchronized(dispatchQueue) {
+                    dispatchQueue.add(message);
+                }
                 try {
                     taskRunner.wakeup();
                 } catch (InterruptedException e) {
@@ -780,7 +784,7 @@
                     sub.run();
                 }
             }
-            getStatistics().getDequeues().increment();
+            //getStatistics().getDequeues().increment();
         }
     }
 
@@ -800,11 +804,13 @@
             }
 
             if (!dispatchStopped.get()) {
-
-                if (dispatchQueue.isEmpty()) {
-                    return false;
+                Command command = null;
+                synchronized(dispatchQueue) {
+                    if (dispatchQueue.isEmpty()) {
+                        return false;
+                    }
+                    command = dispatchQueue.remove(0);
                 }
-                Command command = dispatchQueue.remove(0);
                 processDispatch(command);
                 return true;
             }
@@ -968,16 +974,19 @@
 
         // Run the MessageDispatch callbacks so that message references get
         // cleaned up.
-        for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
-            Command command = iter.next();
-            if (command.isMessageDispatch()) {
-                MessageDispatch md = (MessageDispatch)command;
-                Runnable sub = md.getTransmitCallback();
-                broker.postProcessDispatch(md);
-                if (sub != null) {
-                    sub.run();
+        synchronized(dispatchQueue) {
+            for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();)
{
+                Command command = iter.next();
+                if (command.isMessageDispatch()) {
+                    MessageDispatch md = (MessageDispatch)command;
+                    Runnable sub = md.getTransmitCallback();
+                    broker.postProcessDispatch(md);
+                    if (sub != null) {
+                        sub.run();
+                    }
                 }
             }
+            dispatchQueue.clear();
         }
         //
         // Remove all logical connection associated with this connection



Mime
View raw message