activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rob Davies <rajdav...@gmail.com>
Subject Re: svn commit: r725020 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Date Wed, 10 Dec 2008 13:42:02 GMT
Don't understand why you changed an 'atomic' get and set with a heavy  
synchronized(this) ?

On 10 Dec 2008, at 07:16, djencks@apache.org wrote:

> Author: djencks
> Date: Tue Dec  9 23:16:39 2008
> New Revision: 725020
>
> URL: http://svn.apache.org/viewvc?rev=725020&view=rev
> Log:
> AMQ-2028 fix thread safety problem in ActiveMQSessionExecutor
>
> Modified:
>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
> ActiveMQSessionExecutor.java
>
> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/ 
> activemq/ActiveMQSessionExecutor.java
> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=725020&r1=725019&r2=725020&view=diff
> = 
> = 
> = 
> = 
> = 
> = 
> = 
> = 
> ======================================================================
> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
> ActiveMQSessionExecutor.java (original)
> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
> ActiveMQSessionExecutor.java Tue Dec  9 23:16:39 2008
> @@ -17,9 +17,7 @@
>
> package org.apache.activemq;
>
> -import java.util.Iterator;
> import java.util.List;
> -import java.util.concurrent.atomic.AtomicBoolean;
>
> import javax.jms.JMSException;
>
> @@ -44,9 +42,8 @@
>     private ActiveMQSession session;
>     private MessageDispatchChannel messageQueue = new  
> MessageDispatchChannel();
>     private boolean dispatchedBySessionPool;
> -    private TaskRunner taskRunner;
> +    private volatile TaskRunner taskRunner;
>     private boolean startedOrWarnedThatNotStarted;
> -    private AtomicBoolean taskRunnerCreated = new AtomicBoolean();
>
>     ActiveMQSessionExecutor(ActiveMQSession session) {
>         this.session = session;
> @@ -90,10 +87,14 @@
>         if (!dispatchedBySessionPool) {
>             if (session.isSessionAsyncDispatch()) {
>                 try {
> -                    if (taskRunnerCreated.compareAndSet(false,  
> true)) {
> -                        if (taskRunner == null) {
> -                            taskRunner =  
> session.connection.getSessionTaskRunner().createTaskRunner(this,
> -                                    "ActiveMQ Session: " +  
> session.getSessionId());
> +                    TaskRunner taskRunner = this.taskRunner;
> +                    if (taskRunner == null) {
> +                        synchronized (this) {
> +                            if (this.taskRunner == null) {
> +                                this.taskRunner =  
> session.connection.getSessionTaskRunner().createTaskRunner(this,
> +                                        "ActiveMQ Session: " +  
> session.getSessionId());
> +                            }
> +                            taskRunner = this.taskRunner;
>                         }
>                     }
>                     taskRunner.wakeup();
> @@ -120,8 +121,7 @@
>
>         // TODO - we should use a Map for this indexed by consumerId
>
> -        for (Iterator i = this.session.consumers.iterator();  
> i.hasNext();) {
> -            ActiveMQMessageConsumer consumer =  
> (ActiveMQMessageConsumer)i.next();
> +        for (ActiveMQMessageConsumer consumer :  
> this.session.consumers) {
>             ConsumerId consumerId = message.getConsumerId();
>             if (consumerId.equals(consumer.getConsumerId())) {
>                 consumer.dispatch(message);
> @@ -143,10 +143,10 @@
>         try {
>             if (messageQueue.isRunning()) {
>                 messageQueue.stop();
> +                TaskRunner taskRunner = this.taskRunner;
>                 if (taskRunner != null) {
> +                    this.taskRunner = null;
>                     taskRunner.shutdown();
> -                    taskRunner = null;
> -                    taskRunnerCreated.set(false);
>                 }
>             }
>         } catch (InterruptedException e) {
> @@ -168,7 +168,7 @@
>     }
>
>     MessageDispatch dequeueNoWait() {
> -        return (MessageDispatch)messageQueue.dequeueNoWait();
> +        return messageQueue.dequeueNoWait();
>     }
>
>     protected void clearMessagesInProgress() {
> @@ -182,8 +182,7 @@
>     public boolean iterate() {
>
>         // Deliver any messages queued on the consumer to their  
> listeners.
> -        for (Iterator i = this.session.consumers.iterator();  
> i.hasNext();) {
> -            ActiveMQMessageConsumer consumer =  
> (ActiveMQMessageConsumer)i.next();
> +        for (ActiveMQMessageConsumer consumer :  
> this.session.consumers) {
>             if (consumer.iterate()) {
>                 return true;
>             }
>
>


Mime
View raw message