qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r889022 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main/java/org/apache/qpid/server/su...
Date Wed, 09 Dec 2009 23:58:27 GMT
Author: rgodfrey
Date: Wed Dec  9 23:58:25 2009
New Revision: 889022

URL: http://svn.apache.org/viewvc?rev=889022&view=rev
Log:
QPID-2258 : AMQP0-9-1 Compliance fixes

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
Wed Dec  9 23:58:25 2009
@@ -212,7 +212,7 @@
         final String routingKey = payload.getRoutingKey();
 
 
-        final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
+        final ArrayList<AMQQueue> queues = (routingKey == null) ? _index.get("") :
_index.get(routingKey);
 
         if (_logger.isDebugEnabled())
         {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
Wed Dec  9 23:58:25 2009
@@ -102,6 +102,11 @@
                         body.getNoLocal(), body.getNowait(), queue))
                 {
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission
denied");
+                }                
+                else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner()
!= session)
+                {
+                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+                                                      "Queue " + queue.getName() + " is exclusive,
but not created on this Connection.");
                 }
 
                 if (body.getConsumerTag() != null)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
Wed Dec  9 23:58:25 2009
@@ -97,6 +97,11 @@
                 {
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission
denied");
                 }
+                else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+                {
+                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+                                                      "Queue is exclusive, but not created
on this Connection.");
+                }
 
                 if (!performGet(queue,session, channel, !body.getNoAck()))
                 {
@@ -188,6 +193,11 @@
             super(channel, protocolSession, consumerTag, filters, noLocal, creditManager,
deliveryMethod, recordMethod);
         }
 
+        public boolean isTransient()
+        {
+            return true;
+        }
+
         public boolean wouldSuspend(QueueEntry msg)
         {
             return !getCreditManager().useCreditForMessage(msg.getMessage());

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
Wed Dec  9 23:58:25 2009
@@ -60,6 +60,10 @@
 
         try
         {
+            if(exchangeRegistry.getExchange(body.getExchange()) == null)
+            {
+                throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange:
" + body.getExchange());
+            }
             exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused());
 
             ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody();
@@ -68,6 +72,7 @@
         }
         catch (ExchangeInUseException e)
         {
+            throw body.getChannelException(AMQConstant.IN_USE, "Exchange in use");
             // TODO: sort out consistent channel close mechanism that does all clean up etc.
         }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
Wed Dec  9 23:58:25 2009
@@ -113,6 +113,11 @@
             {
                 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission
denied");
             }
+            else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner()
!= session)
+            {
+                throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+                                                  "Queue " + queue.getName() + " is exclusive,
but not created on this Connection.");
+            }
 
             if (!exch.isBound(routingKey, body.getArguments(), queue))
             {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
Wed Dec  9 23:58:25 2009
@@ -114,25 +114,37 @@
                     {
                         store.createQueue(queue, body.getArguments());
                     }
+                    if(body.getAutoDelete())
+                    {
+                        queue.setDeleteOnNoConsumers(true);
+                    }
                     queueRegistry.registerQueue(queue);
-                    if(queue.isExclusive()  && !queue.isAutoDelete())
+                    if(body.getExclusive())
                     {
-                        final AMQQueue q = queue;
-                        queue.setExclusiveOwner(session);
-                        final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
+                        if(body.getDurable())
                         {
-                            public void doTask(AMQProtocolSession session) throws AMQException
-                            {
-                                q.setExclusiveOwner(null);
-                            }
-                        };
-                        session.addSessionCloseTask(sessionCloseTask);
-                        queue.addQueueDeleteTask(new AMQQueue.Task() {
-                            public void doTask(AMQQueue queue) throws AMQException
+                            queue.setExclusiveOwner(session.getPrincipal().getName());
+                        }
+                        else
+                        {
+                            final AMQQueue q = queue;
+                            queue.setExclusiveOwner(session);
+                            final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
                             {
-                                session.removeSessionCloseTask(sessionCloseTask);
-                            }
-                        });
+                                public void doTask(AMQProtocolSession session) throws AMQException
+                                {
+                                    q.setExclusiveOwner(null);
+                                }
+                            };
+                            session.addSessionCloseTask(sessionCloseTask);
+                            queue.addQueueDeleteTask(new AMQQueue.Task() {
+                                public void doTask(AMQQueue queue) throws AMQException
+                                {
+                                    session.removeSessionCloseTask(sessionCloseTask);
+                                }
+                            });
+                        }
+
                     }
                     if (autoRegister)
                     {
@@ -143,11 +155,19 @@
                     }
                 }
             }
-            else if (queue.getPrincipalHolder() != null
-                      && queue.getPrincipalHolder().getPrincipal() != null
-                      && queue.getPrincipalHolder().getPrincipal().getName() != null
-                      && (!queue.getPrincipalHolder().getPrincipal().getName().equals(session.getPrincipal().getName())
-                          || ((!body.getPassive() && queue.getExclusiveOwner() !=
null && queue.getExclusiveOwner() != session))))
+            else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner()
!= session)
+            {
+                throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+                                                  "Queue " + queue.getName() + " is exclusive,
but not created on this Connection.");
+            }
+            else if(!body.getPassive() && ((queue.isExclusive()) != body.getExclusive()))
+            {
+
+                throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+                                                  "Cannot re-declare queue '" + queue.getName()
+ "' with different exclusivity (was: "
+                                                    + queue.isExclusive() + " requested "
+ body.getExclusive() + ")");
+            }
+            else if (!body.getPassive() && body.getExclusive() && !queue.getExclusiveOwner().equals(queue.isDurable()
? session.getPrincipal().getName() : session))
             {
                 throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare
queue('" + queueName + "'),"
                                                                            + " as exclusive
queue with same name "
@@ -155,6 +175,20 @@
                                                                            + queue.getPrincipalHolder().getPrincipal().getName()
+ "')");
 
             }
+            else if(!body.getPassive() && queue.isAutoDelete() != body.getAutoDelete())
+            {
+                throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+                                                  "Cannot re-declare queue '" + queue.getName()
+ "' with different auto-delete (was: "
+                                                    + queue.isAutoDelete() + " requested
" + body.getAutoDelete() + ")");
+            }
+            else if(!body.getPassive() && queue.isDurable() != body.getDurable())
+            {
+                throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+                                                  "Cannot re-declare queue '" + queue.getName()
+ "' with different durability (was: "
+                                                    + queue.isDurable() + " requested " +
body.getDurable() + ")");
+            }
+
+
             AMQChannel channel = session.getChannel(channelId);
 
             if (channel == null)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
Wed Dec  9 23:58:25 2009
@@ -110,7 +110,11 @@
                 {
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission
denied");
                 }
-
+                else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner()
!= session)
+                {
+                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+                                                      "Queue " + queue.getName() + " is exclusive,
but not created on this Connection.");
+                }
                 int purged = queue.delete();
 
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
Wed Dec  9 23:58:25 2009
@@ -103,6 +103,11 @@
                 if (!virtualHost.getAccessManager().authorisePurge(session, queue))
                 {
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission
denied");
+                }            
+                else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+                {
+                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+                                                      "Queue is exclusive, but not created
on this Connection.");
                 }
 
                 long purged = queue.clearQueue();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Wed
Dec  9 23:58:25 2009
@@ -42,6 +42,9 @@
 
 public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer,
TransactionLogResource
 {
+    boolean getDeleteOnNoConsumers();
+
+    void setDeleteOnNoConsumers(boolean b);
 
 
     public interface Context

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Wed Dec  9 23:58:25 2009
@@ -150,6 +150,7 @@
     private boolean _nolocal;
 
     private final AtomicBoolean _overfull = new AtomicBoolean(false);
+    private boolean _deleteOnNoConsumers;
 
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
boolean autoDelete, VirtualHost virtualHost)
     {
@@ -374,7 +375,7 @@
             throw new ExistingExclusiveSubscription();
         }
 
-        if (exclusive)
+        if (exclusive && !subscription.isTransient())
         {
             if (getConsumerCount() != 0)
             {
@@ -431,7 +432,7 @@
 
             // auto-delete queues must be deleted if there are no remaining subscribers
 
-            if (_autoDelete && getConsumerCount() == 0  && !isExclusive())
+            if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient()
&& getConsumerCount() == 0  )
             {
                 if (_logger.isInfoEnabled())
                 {
@@ -448,6 +449,17 @@
 
     }
 
+    public boolean getDeleteOnNoConsumers()
+    {
+        return _deleteOnNoConsumers;
+    }
+
+    public void setDeleteOnNoConsumers(boolean b)
+    {
+        _deleteOnNoConsumers = b;
+    }
+
+
     // ------ Enqueue / Dequeue
 
     public QueueEntry enqueue(ServerMessage message) throws AMQException

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
Wed Dec  9 23:58:25 2009
@@ -31,6 +31,8 @@
 {
     LogActor getLogActor();
 
+    boolean isTransient();
+
     public static enum State
     {
         ACTIVE,

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
Wed Dec  9 23:58:25 2009
@@ -667,6 +667,11 @@
         return !isBrowser();
     }
 
+    public boolean isTransient()
+    {
+        return false;
+    }
+
     public void set(String key, Object value)
     {
         _properties.put(key, value);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
Wed Dec  9 23:58:25 2009
@@ -649,6 +649,11 @@
         return _logActor;
     }
 
+    public boolean isTransient()
+    {
+        return false;
+    }
+
     ServerSession getSession()
     {
         return _session;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
Wed Dec  9 23:58:25 2009
@@ -823,6 +823,11 @@
                             queue.setPrincipalHolder((ServerSession)session);
                             queue.setExclusiveOwner(session);
                         }
+                        else if(method.getAutoDelete())
+                        {
+                            queue.setDeleteOnNoConsumers(true);
+                        }
+                        
                         final String alternateExchangeName = method.getAlternateExchange();
                         if(alternateExchangeName != null && alternateExchangeName.length()
!= 0)
                         {

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
(original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
Wed Dec  9 23:58:25 2009
@@ -52,6 +52,15 @@
        _name = new AMQShortString(name);
     }
 
+    public boolean getDeleteOnNoConsumers()
+    {
+        return false;
+    }
+
+    public void setDeleteOnNoConsumers(boolean b)
+    {        
+    }
+
     public AMQShortString getName()
     {
         return _name;

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
(original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
Wed Dec  9 23:58:25 2009
@@ -259,6 +259,7 @@
     {
        _queue.stop();
        _queue = new SimpleAMQQueue(_qname, false, null, true, _virtualHost);
+       _queue.setDeleteOnNoConsumers(true);
        _queue.registerSubscription(_subscription, false);
        AMQMessage message = createMessage(new Long(25));
        _queue.enqueue(message);

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
(original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
Wed Dec  9 23:58:25 2009
@@ -105,6 +105,11 @@
         return null;  //To change body of implemented methods use File | Settings | File
Templates.
     }
 
+    public boolean isTransient()
+    {
+        return false;
+    }
+
     public AMQQueue getQueue()
     {
         return queue;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
Wed Dec  9 23:58:25 2009
@@ -68,6 +68,14 @@
 
         ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(), (byte) body.getVersionMinor());
 
+        // 0-9-1 is indistinguishable from 0-9 using only major and minor ... if we established
the connection as 0-9-1
+        // and now get back major = 0 , minor = 9 then we can assume it means 0-9-1
+
+        if(pv.equals(ProtocolVersion.v0_9) && session.getProtocolVersion().equals(ProtocolVersion.v0_91))
+        {
+            pv = ProtocolVersion.v0_91;
+        }
+
         // For the purposes of interop, we can make the client accept the broker's version
string.
         // If it does, it then internally records the version as being the latest one that
it understands.
         // It needs to do this since frame lookup is done by version.

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Wed Dec  9 23:58:25 2009
@@ -403,9 +403,7 @@
         _protocolVersion = pv;
         _methodRegistry = MethodRegistry.getMethodRegistry(pv);
         _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
-
-        //  _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
-    }
+  }
 
     public byte getProtocolMinorVersion()
     {
@@ -422,11 +420,6 @@
         return _protocolVersion;
     }
 
-//    public VersionSpecificRegistry getRegistry()
-//    {
-//        return _registry;
-//    }
-
     public MethodRegistry getMethodRegistry()
     {
         return _methodRegistry;

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
Wed Dec  9 23:58:25 2009
@@ -36,6 +36,7 @@
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.UUID;
 
 /**
  * QPID-293 Setting MessageListener after connection has started can cause messages to be
"lost" on a internal delivery
@@ -61,6 +62,7 @@
     private Session _clientSession1;
     private Queue _queue;
     private final CountDownLatch _allMessagesSent = new CountDownLatch(2); // all messages
Sent Lock
+    private static final String QUEUE_NAME = "queue" + UUID.randomUUID().toString();
 
     protected void setUp() throws Exception
     {
@@ -73,7 +75,7 @@
 
         _clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-        _queue =_clientSession1.createQueue("queue");
+        _queue =_clientSession1.createQueue(QUEUE_NAME);
 
         _consumer1 = _clientSession1.createConsumer(_queue);
 

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DurableQueueLoggingTest.java
Wed Dec  9 23:58:25 2009
@@ -249,11 +249,15 @@
         final Map<String, Object> arguments = new HashMap<String, Object>();
         arguments.put("x-qpid-priorities", PRIORITIES);
         // Need to create a queue that does not exist so use test name
-        ((AMQSession) _session).createQueue(new AMQShortString(getTestQueueName()), false,
_durable, false, arguments);
+        final String queueName = getTestQueueName();
+        ((AMQSession) _session).createQueue(new AMQShortString(queueName), false, _durable,
false, arguments);
+
+        Queue queue = (Queue) _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='"+_durable+"'&autodelete='false'");
+
 
         //Need to create a Consumer to ensure that the log has had time to write
         // as the above Create is Asynchronous
-        _session.createConsumer(_session.createQueue(getTestQueueName()));
+        _session.createConsumer(queue);
 
         // Validation
         List<String> results = _monitor.findMatches(QUEUE_PREFIX);
@@ -310,11 +314,15 @@
         final Map<String, Object> arguments = new HashMap<String, Object>();
         arguments.put("x-qpid-priorities", PRIORITIES);
         // Need to create a queue that does not exist so use test name
-        ((AMQSession) _session).createQueue(new AMQShortString(getTestQueueName()), true,
_durable, false, arguments);
+        final String queueName = getTestQueueName() + "-autoDeletePriority";
+        ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, _durable,
false, arguments);
+
+        Queue queue = (Queue) _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='"+_durable+"'&autodelete='true'");
+
 
         //Need to create a Consumer to ensure that the log has had time to write
         // as the above Create is Asynchronous
-        _session.createConsumer(_session.createQueue(getTestQueueName()));
+        _session.createConsumer(queue);
 
         // Validation
         List<String> results = _monitor.findMatches(QUEUE_PREFIX);

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
Wed Dec  9 23:58:25 2009
@@ -90,7 +90,8 @@
         final Map<String,Object> arguments = new HashMap<String, Object>();
         arguments.put("x-qpid-priorities",10);
         ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false,
false, arguments);
-        queue = new AMQQueue("amq.direct",QUEUE);
+        queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
         producer = producerSession.createProducer(queue);
 
@@ -130,7 +131,8 @@
         final Map<String,Object> arguments = new HashMap<String, Object>();
         arguments.put("x-qpid-priorities",3);
         ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false,
false, arguments);
-        queue = new AMQQueue("amq.direct",QUEUE);
+        queue = producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+        
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
         producer = producerSession.createProducer(queue);
         

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
Wed Dec  9 23:58:25 2009
@@ -107,7 +107,7 @@
         arguments.put("x-qpid-capacity",1000);
         arguments.put("x-qpid-flow-resume-capacity",800);
         ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false,
false, arguments);
-        queue = new AMQQueue("amq.direct",queueName);
+        queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
         producer = producerSession.createProducer(queue);
 
@@ -149,7 +149,7 @@
         arguments.put("x-qpid-capacity",1000);
         arguments.put("x-qpid-flow-resume-capacity",800);
         ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false,
false, arguments);
-        queue = new AMQQueue("amq.direct",queueName);
+        queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
         producer = producerSession.createProducer(queue);
 
@@ -194,7 +194,7 @@
         arguments.put("x-qpid-capacity",1000);
         arguments.put("x-qpid-flow-resume-capacity",800);
         ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false,
arguments);
-        queue = new AMQQueue("amq.direct",queueName);
+        queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
         ((AMQSession) session).declareAndBind((AMQDestination)queue);
         producer = session.createProducer(queue);
 
@@ -224,7 +224,7 @@
         arguments.put("x-qpid-capacity",1000);
         arguments.put("x-qpid-flow-resume-capacity",1000);
         ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false,
false, arguments);
-        queue = new AMQQueue("amq.direct",queueName);
+        queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
         producer = producerSession.createProducer(queue);
 
@@ -266,7 +266,7 @@
 
         ((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false,
false, false, arguments);
 
-        queue = new AMQQueue("amq.direct",queueName);
+        queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='false'");
         ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
         consumerConnection.start();
 
@@ -322,7 +322,7 @@
         arguments.put("x-qpid-capacity",1000);
         arguments.put("x-qpid-flow-resume-capacity",800);
         ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false,
arguments);
-        queue = new AMQQueue("amq.direct",queueName);
+        queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
         ((AMQSession) session).declareAndBind((AMQDestination)queue);
         producer = session.createProducer(queue);
 
@@ -354,7 +354,9 @@
         arguments.put("x-qpid-capacity",0);
         arguments.put("x-qpid-flow-resume-capacity",0);
         ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false,
false, arguments);
-        queue = new AMQQueue("amq.direct",queueName);
+
+        queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
         producer = producerSession.createProducer(queue);
         

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Wed Dec  9 23:58:25 2009
@@ -188,6 +188,11 @@
         return null;  //To change body of implemented methods use File | Settings | File
Templates.
     }
 
+    public boolean isTransient()
+    {
+        return false;
+    }
+
     public AMQQueue getQueue()
     {
         return null;

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java
Wed Dec  9 23:58:25 2009
@@ -21,7 +21,9 @@
 package org.apache.qpid.test.client.message;
 
 import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -57,12 +59,16 @@
         //Create Producer put some messages on the queue
         _connection = getConnection();
 
-        //Create Queue
-        _queue = new AMQQueue("amq.direct", "queue");
-
         //Create Consumer
         _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
+        String queueName = getTestQueueName();
+
+        //Create Queue
+        ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, false, false);
+        _queue = _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+
+
         _consumer = _session.createConsumer(_queue);
 
         _connection.start();

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java?rev=889022&r1=889021&r2=889022&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
Wed Dec  9 23:58:25 2009
@@ -21,7 +21,9 @@
 package org.apache.qpid.test.client.message;
 
 import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -46,12 +48,15 @@
         //Create Connection
         _connection = getConnection();
 
-        //Create Queue
-        Queue queue = new AMQQueue("amq.direct", "queue");
 
         //Create Session
         _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
+        //Create Queue
+        String queueName = getTestQueueName();
+        ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, false, false);
+        Queue queue = _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+
         //Create Consumer
         _consumer = _session.createConsumer(queue);
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message