activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r642067 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Date Fri, 28 Mar 2008 02:22:40 GMT
Author: chirino
Date: Thu Mar 27 19:22:39 2008
New Revision: 642067

URL: http://svn.apache.org/viewvc?rev=642067&view=rev
Log:
- Fixing out of order dispatch.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=642067&r1=642066&r2=642067&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Thu Mar 27 19:22:39 2008
@@ -25,6 +25,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
@@ -107,7 +108,7 @@
     private long redeliveryDelay;
     private int ackCounter;
     private int dispatchedCount;
-    private MessageListener messageListener;
+    private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
     private JMSConsumerStatsImpl stats;
 
     private final String selector;
@@ -330,7 +331,7 @@
      */
     public MessageListener getMessageListener() throws JMSException {
         checkClosed();
-        return this.messageListener;
+        return this.messageListener.get();
     }
 
     /**
@@ -354,19 +355,20 @@
             throw new JMSException(
                                    "Illegal prefetch size of zero. This setting is not supported
for asynchronous consumers please set a value of at least 1");
         }
-        this.messageListener = listener;
         if (listener != null) {
             boolean wasRunning = session.isRunning();
             if (wasRunning) {
                 session.stop();
             }
 
+            this.messageListener.set(listener);
             session.redispatch(this, unconsumedMessages);
 
             if (wasRunning) {
                 session.start();
             }
-
+        } else {
+            this.messageListener.set(null);
         }
     }
 
@@ -934,7 +936,7 @@
     }
 
     public void dispatch(MessageDispatch md) {
-        MessageListener listener = this.messageListener;
+        MessageListener listener = this.messageListener.get();
         try {
             synchronized (unconsumedMessages.getMutex()) {
                 if (clearDispatchList) {
@@ -1024,7 +1026,7 @@
      * @throws JMSException
      */
     public boolean iterate() {
-        MessageListener listener = this.messageListener;
+        MessageListener listener = this.messageListener.get();
         if (listener != null) {
             MessageDispatch md = unconsumedMessages.dequeueNoWait();
             if (md != null) {



Mime
View raw message