qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1574235 [2/2] - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/binding/ broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/jav...
Date Tue, 04 Mar 2014 22:52:06 GMT
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1574235&r1=1574234&r2=1574235&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Tue Mar  4 22:52:05 2014
@@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
 import java.util.UUID;
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.LifetimePolicy;
@@ -683,83 +684,101 @@ public class ServerSessionDelegate exten
                 return;
             }
         }
-
-        if(method.getPassive())
+        if(method.getExchange() == null || method.getExchange().equals(""))
         {
-            ExchangeImpl exchange = getExchange(session, exchangeName);
-
-            if(exchange == null)
+            if(!DirectExchange.TYPE.getType().equals(method.getType()))
             {
-                exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'");
+                exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+                          "Attempt to redeclare default exchange "
+                          + " of type " + DirectExchange.TYPE.getType()
+                          + " to " + method.getType() +".");
             }
-            else
+            if(method.hasAlternateExchange() && !"".equals(method.getAlternateExchange()))
             {
-                if (!exchange.getTypeName().equals(method.getType())
-                        && (method.getType() != null && method.getType().length() > 0))
-                {
-                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: "
-                            + exchangeName + " of type " + exchange.getTypeName() + " to " + method.getType() + ".");
-                }
+                exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+                          "Attempt to set alternate exchange of the default exchange "
+                          + " to " + method.getAlternateExchange() +".");
             }
         }
         else
         {
-
-            try
+            if(method.getPassive())
             {
-                Map<String,Object> attributes = new HashMap<String, Object>();
 
-                attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
-                attributes.put(org.apache.qpid.server.model.Exchange.NAME, method.getExchange());
-                attributes.put(org.apache.qpid.server.model.Exchange.TYPE, method.getType());
-                attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable());
-                attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
-                               method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
-                attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange());
-                virtualHost.createExchange(attributes);
-            }
-            catch(ReservedExchangeNameException e)
-            {
-                exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
-                                            + exchangeName + " which begins with reserved name or prefix.");
-            }
-            catch(UnknownExchangeException e)
-            {
-                exception(session, method, ExecutionErrorCode.NOT_FOUND,
-                                                            "Unknown alternate exchange " + e.getExchangeName());
-            }
-            catch(AMQUnknownExchangeType e)
-            {
-                exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
-            }
-            catch(ExchangeExistsException e)
-            {
-                ExchangeImpl exchange = e.getExistingExchange();
-                if(!exchange.getTypeName().equals(method.getType()))
+                ExchangeImpl exchange = getExchange(session, exchangeName);
+
+                if(exchange == null)
                 {
-                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
-                            "Attempt to redeclare exchange: " + exchangeName
-                                    + " of type " + exchange.getTypeName()
-                                    + " to " + method.getType() +".");
+                    exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'");
                 }
-                else if(method.hasAlternateExchange()
-                          && (exchange.getAlternateExchange() == null ||
-                              !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
+                else
                 {
-                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
-                            "Attempt to change alternate exchange of: " + exchangeName
-                                    + " from " + exchange.getAlternateExchange()
-                                    + " to " + method.getAlternateExchange() +".");
+                    if (!exchange.getTypeName().equals(method.getType())
+                            && (method.getType() != null && method.getType().length() > 0))
+                    {
+                        exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: "
+                                + exchangeName + " of type " + exchange.getTypeName() + " to " + method.getType() + ".");
+                    }
                 }
             }
-            catch (AccessControlException e)
+            else
             {
-                exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
-            }
 
+                try
+                {
+                    Map<String,Object> attributes = new HashMap<String, Object>();
+
+                    attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
+                    attributes.put(org.apache.qpid.server.model.Exchange.NAME, method.getExchange());
+                    attributes.put(org.apache.qpid.server.model.Exchange.TYPE, method.getType());
+                    attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable());
+                    attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+                                   method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+                    attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange());
+                    virtualHost.createExchange(attributes);
+                }
+                catch(ReservedExchangeNameException e)
+                {
+                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
+                                                + exchangeName + " which begins with reserved name or prefix.");
+                }
+                catch(UnknownExchangeException e)
+                {
+                    exception(session, method, ExecutionErrorCode.NOT_FOUND,
+                                                                "Unknown alternate exchange " + e.getExchangeName());
+                }
+                catch(AMQUnknownExchangeType e)
+                {
+                    exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
+                }
+                catch(ExchangeExistsException e)
+                {
+                    ExchangeImpl exchange = e.getExistingExchange();
+                    if(!exchange.getTypeName().equals(method.getType()))
+                    {
+                        exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+                                "Attempt to redeclare exchange: " + exchangeName
+                                        + " of type " + exchange.getTypeName()
+                                        + " to " + method.getType() +".");
+                    }
+                    else if(method.hasAlternateExchange()
+                              && (exchange.getAlternateExchange() == null ||
+                                  !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
+                    {
+                        exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+                                "Attempt to change alternate exchange of: " + exchangeName
+                                        + " from " + exchange.getAlternateExchange()
+                                        + " to " + method.getAlternateExchange() +".");
+                    }
+                }
+                catch (AccessControlException e)
+                {
+                    exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
+                }
 
-        }
 
+            }
+        }
     }
 
     private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description)
@@ -789,12 +808,12 @@ public class ServerSessionDelegate exten
             destination = virtualHost.getMessageDestination(xfr.getDestination());
             if(destination == null)
             {
-                destination = virtualHost.getDefaultExchange();
+                destination = virtualHost.getDefaultDestination();
             }
         }
         else
         {
-            destination = virtualHost.getDefaultExchange();
+            destination = virtualHost.getDefaultDestination();
         }
         return destination;
     }
@@ -878,19 +897,30 @@ public class ServerSessionDelegate exten
 
         ExchangeQueryResult result = new ExchangeQueryResult();
 
-        ExchangeImpl exchange = getExchange(session, method.getName());
 
-        if(exchange != null)
+        final String exchangeName = method.getName();
+
+        if(exchangeName == null || exchangeName.equals(""))
         {
-            result.setDurable(exchange.isDurable());
-            result.setType(exchange.getTypeName());
+            result.setDurable(true);
+            result.setType(DirectExchange.TYPE.getType());
             result.setNotFound(false);
         }
         else
         {
-            result.setNotFound(true);
-        }
+            ExchangeImpl exchange = getExchange(session, exchangeName);
 
+            if(exchange != null)
+            {
+                result.setDurable(exchange.isDurable());
+                result.setType(exchange.getTypeName());
+                result.setNotFound(false);
+            }
+            else
+            {
+                result.setNotFound(true);
+            }
+        }
         session.executionResult((int) method.getId(), result);
     }
 
@@ -904,52 +934,56 @@ public class ServerSessionDelegate exten
         {
             exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
         }
-        else if (nameNullOrEmpty(method.getExchange()))
-        {
-            exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange");
-        }
         else
         {
-            //TODO - here because of non-compliant python tests
-            // should raise exception ILLEGAL_ARGUMENT "binding-key not set"
-            if (!method.hasBindingKey())
-            {
-                method.setBindingKey(method.getQueue());
-            }
-            AMQQueue queue = virtualHost.getQueue(method.getQueue());
-            ExchangeImpl exchange = virtualHost.getExchange(method.getExchange());
-            if(queue == null)
+            final String exchangeName = method.getExchange();
+            if (nameNullOrEmpty(exchangeName))
             {
-                exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
-            }
-            else if(exchange == null)
-            {
-                exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found");
-            }
-            else if(exchange.getExchangeType().equals(HeadersExchange.TYPE) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
-            {
-                exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getType() + " require an x-match header");
+                exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange");
             }
             else
             {
-                if (!exchange.isBound(method.getBindingKey(), method.getArguments(), queue))
+                //TODO - here because of non-compliant python tests
+                // should raise exception ILLEGAL_ARGUMENT "binding-key not set"
+                if (!method.hasBindingKey())
                 {
-                    try
+                    method.setBindingKey(method.getQueue());
+                }
+                AMQQueue queue = virtualHost.getQueue(method.getQueue());
+                ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
+                if(queue == null)
+                {
+                    exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
+                }
+                else if(exchange == null)
+                {
+                    exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + exchangeName + "' not found");
+                }
+                else if(exchange.getExchangeType().equals(HeadersExchange.TYPE) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
+                {
+                    exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getType() + " require an x-match header");
+                }
+                else
+                {
+                    if (!exchange.isBound(method.getBindingKey(), method.getArguments(), queue))
                     {
-                        exchange.addBinding(method.getBindingKey(), queue, method.getArguments());
+                        try
+                        {
+                            exchange.addBinding(method.getBindingKey(), queue, method.getArguments());
+                        }
+                        catch (AccessControlException e)
+                        {
+                            exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
+                        }
                     }
-                    catch (AccessControlException e)
+                    else
                     {
-                        exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
+                        // todo
                     }
                 }
-                else
-                {
-                    // todo
-                }
-            }
 
 
+            }
         }
 
 
@@ -1010,8 +1044,10 @@ public class ServerSessionDelegate exten
         VirtualHost virtualHost = getVirtualHost(session);
         ExchangeImpl exchange;
         AMQQueue queue;
-        if(method.hasExchange())
+        boolean isDefaultExchange;
+        if(method.hasExchange() && !method.getExchange().equals(""))
         {
+            isDefaultExchange = false;
             exchange = virtualHost.getExchange(method.getExchange());
 
             if(exchange == null)
@@ -1021,11 +1057,47 @@ public class ServerSessionDelegate exten
         }
         else
         {
-            exchange = virtualHost.getDefaultExchange();
+            isDefaultExchange = true;
+            exchange = null;
         }
 
+        if(isDefaultExchange)
+        {
+            if(method.hasQueue())
+            {
+                queue = getQueue(session, method.getQueue());
 
-        if(method.hasQueue())
+                if(queue == null)
+                {
+                    result.setQueueNotFound(true);
+                }
+                else
+                {
+                    if(method.hasBindingKey())
+                    {
+                        if(!method.getBindingKey().equals(method.getQueue()))
+                        {
+                            result.setKeyNotMatched(true);
+                        }
+                    }
+                }
+            }
+            else if(method.hasBindingKey())
+            {
+                if(getQueue(session, method.getBindingKey()) == null)
+                {
+                    result.setKeyNotMatched(true);
+                }
+            }
+
+            if(method.hasArguments() && !method.getArguments().isEmpty())
+            {
+                result.setArgsNotMatched(true);
+            }
+
+
+        }
+        else if(method.hasQueue())
         {
 
             queue = getQueue(session, method.getQueue());

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java?rev=1574235&r1=1574234&r2=1574235&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java Tue Mar  4 22:52:05 2014
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -62,16 +61,23 @@ public class BasicPublishMethodHandler i
         }
 
         AMQShortString exchangeName = body.getExchange();
+        VirtualHost vHost = session.getVirtualHost();
+
         // TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
-        if (exchangeName == null)
+
+        MessageDestination destination;
+
+        if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName))
         {
-            exchangeName = AMQShortString.valueOf(ExchangeDefaults.DEFAULT_EXCHANGE_NAME);
+            destination = vHost.getDefaultDestination();
+        }
+        else
+        {
+            destination = vHost.getMessageDestination(exchangeName.toString());
         }
 
-        VirtualHost vHost = session.getVirtualHost();
-        MessageDestination exch = vHost.getMessageDestination(exchangeName.toString());
         // if the exchange does not exist we raise a channel exception
-        if (exch == null)
+        if (destination == null)
         {
             throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name");
         }
@@ -91,7 +97,7 @@ public class BasicPublishMethodHandler i
             info.setExchange(exchangeName);
             try
             {
-                channel.setPublishFrame(info, exch);
+                channel.setPublishFrame(info, destination);
             }
             catch (AccessControlException e)
             {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java?rev=1574235&r1=1574234&r2=1574235&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java Tue Mar  4 22:52:05 2014
@@ -79,107 +79,155 @@ public class ExchangeBoundHandler implem
         channel.sync();
 
 
-        AMQShortString exchangeName = body.getExchange() == null ? AMQShortString.EMPTY_STRING : body.getExchange();
+        AMQShortString exchangeName = body.getExchange();
         AMQShortString queueName = body.getQueue();
         AMQShortString routingKey = body.getRoutingKey();
-        ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
         ExchangeBoundOkBody response;
-        if (exchange == null)
+        if(exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING))
         {
-
-
-            response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
-                                                                new AMQShortString("Exchange '" + exchangeName + "' not found"));
-        }
-        else if (routingKey == null)
-        {
-            if (queueName == null)
+            if(routingKey == null)
             {
-                if (exchange.hasBindings())
+                if(queueName == null)
                 {
-                    response = methodRegistry.createExchangeBoundOkBody(OK, null);
+                    response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null);
                 }
                 else
                 {
+                    AMQQueue queue = virtualHost.getQueue(queueName.toString());
+                    if (queue == null)
+                    {
 
-                    response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS,	// replyCode
-                        null);	// replyText
+                        response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,	// replyCode
+                                                                            new AMQShortString("Queue '" + queueName + "' not found"));	// replyText
+                    }
+                    else
+                    {
+                        response = methodRegistry.createExchangeBoundOkBody(OK, null);
+                    }
                 }
             }
             else
             {
-
-                AMQQueue queue = virtualHost.getQueue(queueName.toString());
-                if (queue == null)
+                if(queueName == null)
                 {
-
-                    response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,	// replyCode
-                        new AMQShortString("Queue '" + queueName + "' not found"));	// replyText
+                    response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null);
                 }
                 else
                 {
-                    if (exchange.isBound(queue))
+                    AMQQueue queue = virtualHost.getQueue(queueName.toString());
+                    if (queue == null)
                     {
 
-                        response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
-                            null);	// replyText
+                        response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,	// replyCode
+                                                                            new AMQShortString("Queue '" + queueName + "' not found"));	// replyText
                     }
                     else
                     {
-
-                        response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND,	// replyCode
-                            new AMQShortString("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'"));	// replyText
+                        response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null);
                     }
                 }
             }
         }
-        else if (queueName != null)
+        else
         {
-            AMQQueue queue = virtualHost.getQueue(queueName.toString());
-            if (queue == null)
+            ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
+            if (exchange == null)
             {
 
-                response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,	// replyCode
-                    new AMQShortString("Queue '" + queueName + "' not found"));	// replyText
+
+                response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
+                                                                    new AMQShortString("Exchange '" + exchangeName + "' not found"));
             }
-            else
+            else if (routingKey == null)
             {
-                String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString();
-                if (exchange.isBound(bindingKey, queue))
+                if (queueName == null)
                 {
+                    if (exchange.hasBindings())
+                    {
+                        response = methodRegistry.createExchangeBoundOkBody(OK, null);
+                    }
+                    else
+                    {
 
-                    response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
-                        null);	// replyText
+                        response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS,	// replyCode
+                            null);	// replyText
+                    }
                 }
                 else
                 {
 
-                    String message = "Queue '" + queueName + "' not bound with routing key '" +
-                                        body.getRoutingKey() + "' to exchange '" + exchangeName + "'";
+                    AMQQueue queue = virtualHost.getQueue(queueName.toString());
+                    if (queue == null)
+                    {
 
-                    if(message.length()>255)
+                        response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,	// replyCode
+                            new AMQShortString("Queue '" + queueName + "' not found"));	// replyText
+                    }
+                    else
                     {
-                        message = message.substring(0,254);
+                        if (exchange.isBound(queue))
+                        {
+
+                            response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
+                                null);	// replyText
+                        }
+                        else
+                        {
+
+                            response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND,	// replyCode
+                                new AMQShortString("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'"));	// replyText
+                        }
                     }
-                    response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,	// replyCode
-                        new AMQShortString(message));	// replyText
                 }
             }
-        }
-        else
-        {
-            if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString()))
+            else if (queueName != null)
             {
+                AMQQueue queue = virtualHost.getQueue(queueName.toString());
+                if (queue == null)
+                {
+
+                    response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,	// replyCode
+                        new AMQShortString("Queue '" + queueName + "' not found"));	// replyText
+                }
+                else
+                {
+                    String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString();
+                    if (exchange.isBound(bindingKey, queue))
+                    {
+
+                        response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
+                            null);	// replyText
+                    }
+                    else
+                    {
+
+                        String message = "Queue '" + queueName + "' not bound with routing key '" +
+                                            body.getRoutingKey() + "' to exchange '" + exchangeName + "'";
 
-                response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
-                    null);	// replyText
+                        if(message.length()>255)
+                        {
+                            message = message.substring(0,254);
+                        }
+                        response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,	// replyCode
+                            new AMQShortString(message));	// replyText
+                    }
+                }
             }
             else
             {
+                if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString()))
+                {
+
+                    response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
+                        null);	// replyText
+                }
+                else
+                {
 
-                response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK,	// replyCode
-                    new AMQShortString("No queue bound with routing key '" + body.getRoutingKey() +
-                    "' to exchange '" + exchangeName + "'"));	// replyText
+                    response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK,	// replyCode
+                        new AMQShortString("No queue bound with routing key '" + body.getRoutingKey() +
+                        "' to exchange '" + exchangeName + "'"));	// replyText
+                }
             }
         }
         session.writeFrame(response.generateFrame(channelId));

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java?rev=1574235&r1=1574234&r2=1574235&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java Tue Mar  4 22:52:05 2014
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.ExchangeDeclareBody;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
@@ -78,76 +79,90 @@ public class ExchangeDeclareHandler impl
 
         ExchangeImpl exchange;
 
-        if (body.getPassive())
+        if(exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING))
         {
-            exchange = virtualHost.getExchange(exchangeName == null ? null : exchangeName.toString());
-            if(exchange == null)
+            if(!new AMQShortString(DirectExchange.TYPE.getType()).equals(body.getType()))
             {
-                throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
+                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: "
+                                                                          + " of type "
+                                                                          + DirectExchange.TYPE.getType()
+                                                                          + " to " + body.getType() +".",
+                                                 body.getClazz(), body.getMethod(),
+                                                 body.getMajor(), body.getMinor(),null);
             }
-            else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getTypeName().equals(body.getType().asString()))
-            {
-
-                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
-                                  exchangeName + " of type " + exchange.getTypeName()
-                                  + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
-            }
-
         }
         else
         {
-            try
+            if (body.getPassive())
             {
-                String name = exchangeName == null ? null : exchangeName.intern().toString();
-                String type = body.getType() == null ? null : body.getType().intern().toString();
-                Map<String,Object> attributes = new HashMap<String, Object>();
-
-                attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
-                attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
-                attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
-                attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
-                attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
-                               body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
-                attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
-                exchange = virtualHost.createExchange(attributes);
+                exchange = virtualHost.getExchange(exchangeName.toString());
+                if(exchange == null)
+                {
+                    throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
+                }
+                else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getTypeName().equals(body.getType().asString()))
+                {
 
-            }
-            catch(ReservedExchangeNameException e)
-            {
-                throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                          "Attempt to declare exchange: " + exchangeName +
-                                          " which begins with reserved prefix.");
+                    throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
+                                      exchangeName + " of type " + exchange.getTypeName()
+                                      + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
+                }
 
             }
-            catch(ExchangeExistsException e)
+            else
             {
-                exchange = e.getExistingExchange();
-                if(!new AMQShortString(exchange.getTypeName()).equals(body.getType()))
+                try
                 {
-                    throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
-                                                                              + exchangeName + " of type "
-                                                                              + exchange.getTypeName()
-                                                                              + " to " + body.getType() +".",
-                                                     body.getClazz(), body.getMethod(),
-                                                     body.getMajor(), body.getMinor(),null);
+                    String name = exchangeName == null ? null : exchangeName.intern().toString();
+                    String type = body.getType() == null ? null : body.getType().intern().toString();
+                    Map<String,Object> attributes = new HashMap<String, Object>();
+
+                    attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
+                    attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
+                    attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
+                    attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
+                    attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+                                   body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+                    attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+                    exchange = virtualHost.createExchange(attributes);
+
+                }
+                catch(ReservedExchangeNameException e)
+                {
+                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+                                              "Attempt to declare exchange: " + exchangeName +
+                                              " which begins with reserved prefix.");
+
+                }
+                catch(ExchangeExistsException e)
+                {
+                    exchange = e.getExistingExchange();
+                    if(!new AMQShortString(exchange.getTypeName()).equals(body.getType()))
+                    {
+                        throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+                                                                                  + exchangeName + " of type "
+                                                                                  + exchange.getTypeName()
+                                                                                  + " to " + body.getType() +".",
+                                                         body.getClazz(), body.getMethod(),
+                                                         body.getMajor(), body.getMinor(),null);
+                    }
+                }
+                catch(AMQUnknownExchangeType e)
+                {
+                    throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e);
+                }
+                catch (AccessControlException e)
+                {
+                    throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+                }
+                catch (UnknownExchangeException e)
+                {
+                    // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
+                    throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e);
                 }
-            }
-            catch(AMQUnknownExchangeType e)
-            {
-                throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e);
-            }
-            catch (AccessControlException e)
-            {
-                throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
-            }
-            catch (UnknownExchangeException e)
-            {
-                // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
-                throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e);
             }
         }
 
-
         if(!body.getNowait())
         {
             MethodRegistry methodRegistry = session.getMethodRegistry();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java?rev=1574235&r1=1574234&r2=1574235&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java Tue Mar  4 22:52:05 2014
@@ -62,6 +62,11 @@ public class ExchangeDeleteHandler imple
         {
             final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString();
 
+            if(exchangeName == null || "".equals(exchangeName))
+            {
+                throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted");
+            }
+
             final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
             if(exchange == null)
             {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java?rev=1574235&r1=1574234&r2=1574235&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java Tue Mar  4 22:52:05 2014
@@ -102,6 +102,12 @@ public class QueueBindHandler implements
             throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
         }
         final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString();
+
+        if(exchangeName == null || "".equals(exchangeName))
+        {
+            throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot bind the queue " + queueName + " to the default exchange");
+        }
+
         final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
         if (exch == null)
         {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java?rev=1574235&r1=1574234&r2=1574235&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java Tue Mar  4 22:52:05 2014
@@ -93,6 +93,12 @@ public class QueueUnbindHandler implemen
         {
             throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
         }
+
+        if(body.getExchange() == null || body.getExchange().equals(AMQShortString.EMPTY_STRING))
+        {
+            throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue " + queue.getName() + " from the default exchange");
+        }
+
         final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString());
         if (exch == null)
         {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java?rev=1574235&r1=1574234&r2=1574235&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java Tue Mar  4 22:52:05 2014
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.BasicCont
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -73,10 +74,18 @@ public class BrokerTestHelper_0_8 extend
         when(info.getExchange()).thenReturn(exchangeNameAsShortString);
         when(info.getRoutingKey()).thenReturn(routingKey);
 
-        ExchangeImpl exchange = channel.getVirtualHost().getExchange(exchangeName);
+        MessageDestination destination;
+        if(exchangeName == null || "".equals(exchangeName))
+        {
+            destination = channel.getVirtualHost().getDefaultDestination();
+        }
+        else
+        {
+            destination = channel.getVirtualHost().getExchange(exchangeName);
+        }
         for (int count = 0; count < numberOfMessages; count++)
         {
-            channel.setPublishFrame(info, exchange);
+            channel.setPublishFrame(info, destination);
 
             // Set the body size
             ContentHeaderBody _headerBody = new ContentHeaderBody();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1574235&r1=1574234&r2=1574235&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Mar  4 22:52:05 2014
@@ -130,11 +130,7 @@ public class Session_1_0 implements Sess
                     MessageSource queue = getVirtualHost().getMessageSource(addr);
                     if(queue != null)
                     {
-
                         destination = new MessageSourceDestination(queue);
-
-
-
                     }
                     else
                     {
@@ -145,7 +141,6 @@ public class Session_1_0 implements Sess
                         }
                         else
                         {
-
                             endpoint.setSource(null);
                             destination = null;
                         }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1574235&r1=1574234&r2=1574235&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java Tue Mar  4 22:52:05 2014
@@ -34,7 +34,6 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
-import org.apache.qpid.server.binding.BindingImpl;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.exchange.ExchangeImpl;
@@ -366,7 +365,7 @@ public class MessageStoreTest extends Qp
     {
         int origExchangeCount = getVirtualHost().getExchanges().size();
 
-        Map<String, ExchangeImpl> oldExchanges = createExchanges();
+        Map<String, ExchangeImpl<?>> oldExchanges = createExchanges();
 
         assertEquals("Incorrect number of exchanges registered before recovery",
                 origExchangeCount + 3, getVirtualHost().getExchanges().size());
@@ -421,7 +420,7 @@ public class MessageStoreTest extends Qp
         createAllQueues();
         createAllTopicQueues();
 
-        Map<String, ExchangeImpl> exchanges = createExchanges();
+        Map<String, ExchangeImpl<?>> exchanges = createExchanges();
 
         ExchangeImpl nonDurableExchange = exchanges.get(nonDurableExchangeName);
         ExchangeImpl directExchange = exchanges.get(directExchangeName);
@@ -479,11 +478,11 @@ public class MessageStoreTest extends Qp
      * and that the new exchanges are not the same objects as the provided list (i.e. that the
      * reload actually generated new exchange objects)
      */
-    private void validateExchanges(int originalNumExchanges, Map<String, ExchangeImpl> oldExchanges)
+    private void validateExchanges(int originalNumExchanges, Map<String, ExchangeImpl<?>> oldExchanges)
     {
-        Collection<ExchangeImpl> exchanges = getVirtualHost().getExchanges();
+        Collection<ExchangeImpl<?>> exchanges = getVirtualHost().getExchanges();
         Collection<String> exchangeNames = new ArrayList(exchanges.size());
-        for(ExchangeImpl exchange : exchanges)
+        for(ExchangeImpl<?> exchange : exchanges)
         {
             exchangeNames.add(exchange.getName());
         }
@@ -709,9 +708,9 @@ public class MessageStoreTest extends Qp
 
     }
 
-    private Map<String, ExchangeImpl> createExchanges() throws Exception
+    private Map<String, ExchangeImpl<?>> createExchanges() throws Exception
     {
-        Map<String, ExchangeImpl> exchanges = new HashMap<String, ExchangeImpl>();
+        Map<String, ExchangeImpl<?>> exchanges = new HashMap<String, ExchangeImpl<?>>();
 
         //Register non-durable DirectExchange
         exchanges.put(nonDurableExchangeName, createExchange(DirectExchange.TYPE, nonDurableExchangeName, false));
@@ -723,9 +722,9 @@ public class MessageStoreTest extends Qp
         return exchanges;
     }
 
-    private ExchangeImpl createExchange(ExchangeType<?> type, String name, boolean durable) throws Exception
+    private ExchangeImpl<?> createExchange(ExchangeType<?> type, String name, boolean durable) throws Exception
     {
-        ExchangeImpl exchange = null;
+        ExchangeImpl<?> exchange = null;
 
         Map<String,Object> attributes = new HashMap<String, Object>();
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message