activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1244796 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQConnection.java ActiveMQMessageConsumer.java ActiveMQSession.java
Date Thu, 16 Feb 2012 00:02:16 GMT
Author: tabish
Date: Thu Feb 16 00:02:15 2012
New Revision: 1244796

URL: http://svn.apache.org/viewvc?rev=1244796&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3714 lazy init the Scheduler object in
ActiveMQConnection.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1244796&r1=1244795&r2=1244796&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Thu Feb 16 00:02:15 2012
@@ -33,6 +33,7 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
@@ -51,6 +52,7 @@ import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
 import javax.jms.XAConnection;
+
 import org.apache.activemq.advisory.DestinationSource;
 import org.apache.activemq.blob.BlobTransferPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -190,7 +192,7 @@ public class ActiveMQConnection implemen
     private boolean useDedicatedTaskRunner;
     protected volatile CountDownLatch transportInterruptionProcessingComplete;
     private long consumerFailoverRedeliveryWaitPeriod;
-    private final Scheduler scheduler;
+    private Scheduler scheduler;
     private boolean messagePrioritySupported = true;
     private boolean transactedIndividualAck = false;
     private boolean nonBlockingRedelivery = false;
@@ -230,8 +232,6 @@ public class ActiveMQConnection implemen
         this.factoryStats.addConnection(this);
         this.timeCreated = System.currentTimeMillis();
         this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
-        this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler");
-        this.scheduler.start();
     }
 
     protected void setUserName(String userName) {
@@ -622,9 +622,11 @@ public class ActiveMQConnection implemen
                         advisoryConsumer.dispose();
                         advisoryConsumer = null;
                     }
-                    if (this.scheduler != null) {
+
+                    Scheduler scheduler = this.scheduler;
+                    if (scheduler != null) {
                         try {
-                            this.scheduler.stop();
+                            scheduler.stop();
                         } catch (Exception e) {
                             JMSException ex =  JMSExceptionSupport.create(e);
                             throw ex;
@@ -2408,8 +2410,23 @@ public class ActiveMQConnection implemen
         return consumerFailoverRedeliveryWaitPeriod;
     }
 
-    protected Scheduler getScheduler() {
-        return this.scheduler;
+    protected Scheduler getScheduler() throws JMSException {
+        Scheduler result = scheduler;
+        if (result == null) {
+            synchronized (this) {
+                result = scheduler;
+                if (result == null) {
+                    checkClosed();
+                    try {
+                        result = scheduler = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"]
Scheduler");
+                        scheduler.start();
+                    } catch(Exception e) {
+                        throw JMSExceptionSupport.create(e);
+                    }
+                }
+            }
+        }
+        return result;
     }
 
     protected ThreadPoolExecutor getExecutor() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1244796&r1=1244795&r2=1244796&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Thu Feb 16 00:02:15 2012
@@ -29,6 +29,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
@@ -36,6 +37,7 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.TransactionRolledBackException;
+
 import org.apache.activemq.blob.BlobDownloader;
 import org.apache.activemq.command.ActiveMQBlobMessage;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -54,7 +56,6 @@ import org.apache.activemq.management.JM
 import org.apache.activemq.management.StatsCapable;
 import org.apache.activemq.management.StatsImpl;
 import org.apache.activemq.selector.SelectorParser;
-import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -109,7 +110,6 @@ public class ActiveMQMessageConsumer imp
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
-    protected final Scheduler scheduler;
     protected final ActiveMQSession session;
     protected final ConsumerInfo info;
 
@@ -207,7 +207,6 @@ public class ActiveMQMessageConsumer imp
         }
 
         this.session = session;
-        this.scheduler = session.getScheduler();
         this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
         setTransformer(session.getTransformer());
 
@@ -1192,7 +1191,7 @@ public class ActiveMQMessageConsumer imp
                                 new LinkedList<MessageDispatch>(deliveredMessages);
 
                             // Start up the delivery again a little later.
-                            scheduler.executeAfterDelay(new Runnable() {
+                            session.getScheduler().executeAfterDelay(new Runnable() {
                                 public void run() {
                                     try {
                                         if (!unconsumedMessages.isClosed()) {
@@ -1216,7 +1215,7 @@ public class ActiveMQMessageConsumer imp
 
                         if (redeliveryDelay > 0 && !unconsumedMessages.isClosed())
{
                             // Start up the delivery again a little later.
-                            scheduler.executeAfterDelay(new Runnable() {
+                            session.getScheduler().executeAfterDelay(new Runnable() {
                                 public void run() {
                                     try {
                                         if (started.get()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=1244796&r1=1244795&r2=1244796&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Thu
Feb 16 00:02:15 2012
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
@@ -53,6 +54,7 @@ import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 import javax.jms.TransactionRolledBackException;
+
 import org.apache.activemq.blob.BlobDownloader;
 import org.apache.activemq.blob.BlobTransferPolicy;
 import org.apache.activemq.blob.BlobUploader;
@@ -198,7 +200,6 @@ public class ActiveMQSession implements 
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
-    private final Scheduler scheduler;
     private final ThreadPoolExecutor connectionExecutor;
 
     protected int acknowledgementMode;
@@ -251,7 +252,6 @@ public class ActiveMQSession implements 
         this.connection.asyncSendPacket(info);
         setTransformer(connection.getTransformer());
         setBlobTransferPolicy(connection.getBlobTransferPolicy());
-        this.scheduler=connection.getScheduler();
         this.connectionExecutor=connection.getExecutor();
         this.executor = new ActiveMQSessionExecutor(this);
         connection.addSession(this);
@@ -659,10 +659,14 @@ public class ActiveMQSession implements 
         //
         for (final ActiveMQMessageConsumer consumer : consumers) {
             consumer.inProgressClearRequired();
-            scheduler.executeAfterDelay(new Runnable() {
-                public void run() {
-                    consumer.clearMessagesInProgress();
-                }}, 0l);
+            try {
+                connection.getScheduler().executeAfterDelay(new Runnable() {
+                    public void run() {
+                        consumer.clearMessagesInProgress();
+                    }}, 0l);
+            } catch (JMSException e) {
+                connection.onClientInternalException(e);
+            }
         }
     }
 
@@ -892,7 +896,7 @@ public class ActiveMQSession implements 
                                 for (int i = 0; i < redeliveryCounter; i++) {
                                     redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
                                 }
-                                scheduler.executeAfterDelay(new Runnable() {
+                                connection.getScheduler().executeAfterDelay(new Runnable()
{
 
                                     public void run() {
                                         ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
@@ -2051,8 +2055,8 @@ public class ActiveMQSession implements 
         }
     }
 
-    protected Scheduler getScheduler() {
-        return this.scheduler;
+    protected Scheduler getScheduler() throws JMSException {
+        return this.connection.getScheduler();
     }
 
     protected ThreadPoolExecutor getConnectionExecutor() {



Mime
View raw message