qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1466482 - in /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server: exchange/AbstractExchange.java queue/BaseQueue.java
Date Wed, 10 Apr 2013 13:33:43 GMT
Author: rgodfrey
Date: Wed Apr 10 13:33:43 2013
New Revision: 1466482

URL: http://svn.apache.org/r1466482
Log:
QPID-2789 : [Java Broker] Prevent additional messages being enqueued after a queue is deleted.

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1466482&r1=1466481&r2=1466482&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
Wed Apr 10 13:33:43 2013
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.exchange;
 
+import java.util.ArrayList;
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.binding.Binding;
@@ -47,6 +49,7 @@ import java.util.concurrent.atomic.Atomi
 
 public abstract class AbstractExchange implements Exchange
 {
+    private static final Logger _logger = Logger.getLogger(AbstractExchange.class);
     private AMQShortString _name;
     private final AtomicBoolean _closed = new AtomicBoolean();
 
@@ -295,7 +298,29 @@ public abstract class AbstractExchange i
     {
         _receivedMessageCount.incrementAndGet();
         _receivedMessageSize.addAndGet(message.getSize());
-        final List<? extends BaseQueue> queues = doRoute(message);
+        List<? extends BaseQueue> queues = doRoute(message);
+        List<? extends BaseQueue> allQueues = queues;
+
+        boolean deletedQueues = false;
+
+        for(BaseQueue q : allQueues)
+        {
+            if(q.isDeleted())
+            {
+                if(!deletedQueues)
+                {
+                    deletedQueues = true;
+                    queues = new ArrayList<BaseQueue>(allQueues);
+                }
+                if(_logger.isDebugEnabled())
+                {
+                    _logger.debug("Exchange: " + getName() + " - attempt to enqueue message
onto deleted queue " + String.valueOf(q.getNameShortString()));
+                }
+                queues.remove(q);
+            }
+        }
+
+
         if(!queues.isEmpty())
         {
             _routedMessageCount.incrementAndGet();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1466482&r1=1466481&r2=1466482&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
Wed Apr 10 13:33:43 2013
@@ -38,6 +38,7 @@ public interface BaseQueue extends Trans
     void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action)
throws AMQException;
 
     boolean isDurable();
+    boolean isDeleted();
 
     AMQShortString getNameShortString();
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message