activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From djen...@apache.org
Subject svn commit: r725020 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Date Wed, 10 Dec 2008 07:16:39 GMT
Author: djencks
Date: Tue Dec  9 23:16:39 2008
New Revision: 725020

URL: http://svn.apache.org/viewvc?rev=725020&view=rev
Log:
AMQ-2028 fix thread safety problem in ActiveMQSessionExecutor

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=725020&r1=725019&r2=725020&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Tue Dec  9 23:16:39 2008
@@ -17,9 +17,7 @@
 
 package org.apache.activemq;
 
-import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.JMSException;
 
@@ -44,9 +42,8 @@
     private ActiveMQSession session;
     private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
     private boolean dispatchedBySessionPool;
-    private TaskRunner taskRunner;
+    private volatile TaskRunner taskRunner;
     private boolean startedOrWarnedThatNotStarted;
-    private AtomicBoolean taskRunnerCreated = new AtomicBoolean();
 
     ActiveMQSessionExecutor(ActiveMQSession session) {
         this.session = session;
@@ -90,10 +87,14 @@
         if (!dispatchedBySessionPool) {
             if (session.isSessionAsyncDispatch()) {
                 try {
-                    if (taskRunnerCreated.compareAndSet(false, true)) {
-                        if (taskRunner == null) {
-                            taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
-                                    "ActiveMQ Session: " + session.getSessionId());
+                    TaskRunner taskRunner = this.taskRunner;
+                    if (taskRunner == null) {
+                        synchronized (this) {
+                            if (this.taskRunner == null) {
+                                this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
+                                        "ActiveMQ Session: " + session.getSessionId());
+                            }
+                            taskRunner = this.taskRunner;
                         }
                     }
                     taskRunner.wakeup();
@@ -120,8 +121,7 @@
 
         // TODO - we should use a Map for this indexed by consumerId
 
-        for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
-            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
+        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
             ConsumerId consumerId = message.getConsumerId();
             if (consumerId.equals(consumer.getConsumerId())) {
                 consumer.dispatch(message);
@@ -143,10 +143,10 @@
         try {
             if (messageQueue.isRunning()) {
                 messageQueue.stop();
+                TaskRunner taskRunner = this.taskRunner;
                 if (taskRunner != null) {
+                    this.taskRunner = null;
                     taskRunner.shutdown();
-                    taskRunner = null;
-                    taskRunnerCreated.set(false);
                 }
             }
         } catch (InterruptedException e) {
@@ -168,7 +168,7 @@
     }
 
     MessageDispatch dequeueNoWait() {
-        return (MessageDispatch)messageQueue.dequeueNoWait();
+        return messageQueue.dequeueNoWait();
     }
 
     protected void clearMessagesInProgress() {
@@ -182,8 +182,7 @@
     public boolean iterate() {
 
         // Deliver any messages queued on the consumer to their listeners.
-        for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
-            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
+        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
             if (consumer.iterate()) {
                 return true;
             }



Mime
View raw message