activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4970
Date Mon, 20 Jan 2014 15:43:51 GMT
Updated Branches:
  refs/heads/trunk 48d25adfc -> f7cbe9fa1


https://issues.apache.org/jira/browse/AMQ-4970

Prevent multiple start / stop operations on a queue from having any
effect.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f7cbe9fa
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f7cbe9fa
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f7cbe9fa

Branch: refs/heads/trunk
Commit: f7cbe9fa173f1f3cf91d016ed340f90a2bd61242
Parents: 48d25ad
Author: Timothy Bish <tabish121@gmai.com>
Authored: Mon Jan 20 10:43:44 2014 -0500
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Mon Jan 20 10:43:44 2014 -0500

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    | 104 ++++++++++++-------
 1 file changed, 66 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f7cbe9fa/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 10b463e..0ae4463 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -17,7 +17,18 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -27,6 +38,7 @@ import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -36,6 +48,7 @@ import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.ResourceAllocationException;
 import javax.transaction.xa.XAException;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -52,7 +65,17 @@ import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
 import org.apache.activemq.broker.util.InsertionCountList;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
@@ -108,6 +131,7 @@ public class Queue extends BaseDestination implements Task, UsageListener
{
     private CountDownLatch consumersBeforeStartsLatch;
     private final AtomicLong pendingWakeups = new AtomicLong();
     private boolean allConsumersExclusiveByDefault = false;
+    private final AtomicBoolean started = new AtomicBoolean();
 
     private boolean resetNeeded;
 
@@ -717,7 +741,7 @@ public class Queue extends BaseDestination implements Task, UsageListener
{
     }
 
     final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction,
SendSync>();
-    private LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();
+    private final LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();
 
     // roll up all message sends
     class SendSync extends Synchronization {
@@ -995,51 +1019,55 @@ public class Queue extends BaseDestination implements Task, UsageListener
{
 
     @Override
     public void start() throws Exception {
-        if (memoryUsage != null) {
-            memoryUsage.start();
-        }
-        if (systemUsage.getStoreUsage() != null) {
-            systemUsage.getStoreUsage().start();
-        }
-        systemUsage.getMemoryUsage().addUsageListener(this);
-        messages.start();
-        if (getExpireMessagesPeriod() > 0) {
-            scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
+        if (started.compareAndSet(false, true)) {
+            if (memoryUsage != null) {
+                memoryUsage.start();
+            }
+            if (systemUsage.getStoreUsage() != null) {
+                systemUsage.getStoreUsage().start();
+            }
+            systemUsage.getMemoryUsage().addUsageListener(this);
+            messages.start();
+            if (getExpireMessagesPeriod() > 0) {
+                scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
+            }
+            doPageIn(false);
         }
-        doPageIn(false);
     }
 
     @Override
     public void stop() throws Exception {
-        if (taskRunner != null) {
-            taskRunner.shutdown();
-        }
-        if (this.executor != null) {
-            ThreadPoolUtils.shutdownNow(executor);
-            executor = null;
-        }
+        if (started.compareAndSet(true, false)) {
+            if (taskRunner != null) {
+                taskRunner.shutdown();
+            }
+            if (this.executor != null) {
+                ThreadPoolUtils.shutdownNow(executor);
+                executor = null;
+            }
 
-        scheduler.cancel(expireMessagesTask);
+            scheduler.cancel(expireMessagesTask);
 
-        if (flowControlTimeoutTask.isAlive()) {
-            flowControlTimeoutTask.interrupt();
-        }
+            if (flowControlTimeoutTask.isAlive()) {
+                flowControlTimeoutTask.interrupt();
+            }
 
-        if (messages != null) {
-            messages.stop();
-        }
+            if (messages != null) {
+                messages.stop();
+            }
 
-        for (MessageReference messageReference : pagedInMessages.values()) {
-            messageReference.decrementReferenceCount();
-        }
-        pagedInMessages.clear();
+            for (MessageReference messageReference : pagedInMessages.values()) {
+                messageReference.decrementReferenceCount();
+            }
+            pagedInMessages.clear();
 
-        systemUsage.getMemoryUsage().removeUsageListener(this);
-        if (memoryUsage != null) {
-            memoryUsage.stop();
-        }
-        if (store != null) {
-            store.stop();
+            systemUsage.getMemoryUsage().removeUsageListener(this);
+            if (memoryUsage != null) {
+                memoryUsage.stop();
+            }
+            if (store != null) {
+                store.stop();
+            }
         }
     }
 


Mime
View raw message