activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdavies <rajdav...@gmail.com>
Subject Re: svn commit: r725020 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Date Wed, 10 Dec 2008 14:41:11 GMT

David - ignore me - missed the taskRunner.wakeup() call ... :)

rajdavies wrote:
> 
> Don't understand why you changed an 'atomic' get and set with a heavy  
> synchronized(this) ?
> 
> On 10 Dec 2008, at 07:16, djencks@apache.org wrote:
> 
>> Author: djencks
>> Date: Tue Dec  9 23:16:39 2008
>> New Revision: 725020
>>
>> URL: http://svn.apache.org/viewvc?rev=725020&view=rev
>> Log:
>> AMQ-2028 fix thread safety problem in ActiveMQSessionExecutor
>>
>> Modified:
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> ActiveMQSessionExecutor.java
>>
>> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/ 
>> activemq/ActiveMQSessionExecutor.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=725020&r1=725019&r2=725020&view=diff
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> ======================================================================
>> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> ActiveMQSessionExecutor.java (original)
>> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> ActiveMQSessionExecutor.java Tue Dec  9 23:16:39 2008
>> @@ -17,9 +17,7 @@
>>
>> package org.apache.activemq;
>>
>> -import java.util.Iterator;
>> import java.util.List;
>> -import java.util.concurrent.atomic.AtomicBoolean;
>>
>> import javax.jms.JMSException;
>>
>> @@ -44,9 +42,8 @@
>>     private ActiveMQSession session;
>>     private MessageDispatchChannel messageQueue = new  
>> MessageDispatchChannel();
>>     private boolean dispatchedBySessionPool;
>> -    private TaskRunner taskRunner;
>> +    private volatile TaskRunner taskRunner;
>>     private boolean startedOrWarnedThatNotStarted;
>> -    private AtomicBoolean taskRunnerCreated = new AtomicBoolean();
>>
>>     ActiveMQSessionExecutor(ActiveMQSession session) {
>>         this.session = session;
>> @@ -90,10 +87,14 @@
>>         if (!dispatchedBySessionPool) {
>>             if (session.isSessionAsyncDispatch()) {
>>                 try {
>> -                    if (taskRunnerCreated.compareAndSet(false,  
>> true)) {
>> -                        if (taskRunner == null) {
>> -                            taskRunner =  
>> session.connection.getSessionTaskRunner().createTaskRunner(this,
>> -                                    "ActiveMQ Session: " +  
>> session.getSessionId());
>> +                    TaskRunner taskRunner = this.taskRunner;
>> +                    if (taskRunner == null) {
>> +                        synchronized (this) {
>> +                            if (this.taskRunner == null) {
>> +                                this.taskRunner =  
>> session.connection.getSessionTaskRunner().createTaskRunner(this,
>> +                                        "ActiveMQ Session: " +  
>> session.getSessionId());
>> +                            }
>> +                            taskRunner = this.taskRunner;
>>                         }
>>                     }
>>                     taskRunner.wakeup();
>> @@ -120,8 +121,7 @@
>>
>>         // TODO - we should use a Map for this indexed by consumerId
>>
>> -        for (Iterator i = this.session.consumers.iterator();  
>> i.hasNext();) {
>> -            ActiveMQMessageConsumer consumer =  
>> (ActiveMQMessageConsumer)i.next();
>> +        for (ActiveMQMessageConsumer consumer :  
>> this.session.consumers) {
>>             ConsumerId consumerId = message.getConsumerId();
>>             if (consumerId.equals(consumer.getConsumerId())) {
>>                 consumer.dispatch(message);
>> @@ -143,10 +143,10 @@
>>         try {
>>             if (messageQueue.isRunning()) {
>>                 messageQueue.stop();
>> +                TaskRunner taskRunner = this.taskRunner;
>>                 if (taskRunner != null) {
>> +                    this.taskRunner = null;
>>                     taskRunner.shutdown();
>> -                    taskRunner = null;
>> -                    taskRunnerCreated.set(false);
>>                 }
>>             }
>>         } catch (InterruptedException e) {
>> @@ -168,7 +168,7 @@
>>     }
>>
>>     MessageDispatch dequeueNoWait() {
>> -        return (MessageDispatch)messageQueue.dequeueNoWait();
>> +        return messageQueue.dequeueNoWait();
>>     }
>>
>>     protected void clearMessagesInProgress() {
>> @@ -182,8 +182,7 @@
>>     public boolean iterate() {
>>
>>         // Deliver any messages queued on the consumer to their  
>> listeners.
>> -        for (Iterator i = this.session.consumers.iterator();  
>> i.hasNext();) {
>> -            ActiveMQMessageConsumer consumer =  
>> (ActiveMQMessageConsumer)i.next();
>> +        for (ActiveMQMessageConsumer consumer :  
>> this.session.consumers) {
>>             if (consumer.iterate()) {
>>                 return true;
>>             }
>>
>>
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/Re%3A-svn-commit%3A-r725020----activemq-trunk-activemq-core-src-main-java-org-apache-activemq-ActiveMQSessionExecutor.java-tp20935620p20936633.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.


Mime
View raw message