qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1574582 - in /qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0: ExchangeDestination.java NodeReceivingDestination.java SendingLink_1_0.java Session_1_0.java
Date Wed, 05 Mar 2014 17:33:23 GMT
Author: rgodfrey
Date: Wed Mar  5 17:33:23 2014
New Revision: 1574582

URL: http://svn.apache.org/r1574582
Log:
QPID-5605 : [Java Broker] [AMQP 1.0] allow use of addresses of the form <exchange>/<routing-key>

Modified:
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1574582&r1=1574581&r2=1574582&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
(original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
Wed Mar  5 17:33:23 2014
@@ -38,6 +38,7 @@ public class ExchangeDestination impleme
     private ExchangeImpl _exchange;
     private TerminusDurability _durability;
     private TerminusExpiryPolicy _expiryPolicy;
+    private String _initialRoutingAddress;
 
     public ExchangeDestination(ExchangeImpl exchange, TerminusDurability durable, TerminusExpiryPolicy
expiryPolicy)
     {
@@ -76,7 +77,13 @@ public class ExchangeDestination impleme
                     return null;
                 }};
 
-        int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties,
txn, null);
+        int enqueues = _exchange.send(message,
+                                      _initialRoutingAddress == null
+                                              ? message.getInitialRoutingAddress()
+                                              : _initialRoutingAddress,
+                                      instanceProperties,
+                                      txn,
+                                      null);
 
 
         return enqueues == 0 ? REJECTED : ACCEPTED;
@@ -102,4 +109,14 @@ public class ExchangeDestination impleme
     {
         return _exchange;
     }
+
+    public void setInitialRoutingAddress(final String initialRoutingAddress)
+    {
+        _initialRoutingAddress = initialRoutingAddress;
+    }
+
+    public String getInitialRoutingAddress()
+    {
+        return _initialRoutingAddress;
+    }
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1574582&r1=1574581&r2=1574582&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
(original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
Wed Mar  5 17:33:23 2014
@@ -35,13 +35,13 @@ public class NodeReceivingDestination im
     public static final Rejected REJECTED = new Rejected();
     private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
 
-    private MessageDestination _exchange;
+    private MessageDestination _destination;
     private TerminusDurability _durability;
     private TerminusExpiryPolicy _expiryPolicy;
 
-    public NodeReceivingDestination(MessageDestination exchange, TerminusDurability durable,
TerminusExpiryPolicy expiryPolicy)
+    public NodeReceivingDestination(MessageDestination destination, TerminusDurability durable,
TerminusExpiryPolicy expiryPolicy)
     {
-        _exchange = exchange;
+        _destination = destination;
         _durability = durable;
         _expiryPolicy = expiryPolicy;
     }
@@ -76,7 +76,7 @@ public class NodeReceivingDestination im
                     return null;
                 }};
 
-        int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties,
txn, null);
+        int enqueues = _destination.send(message, message.getInitialRoutingAddress(), instanceProperties,
txn, null);
 
 
         return enqueues == 0 ? REJECTED : ACCEPTED;
@@ -100,6 +100,6 @@ public class NodeReceivingDestination im
 
     public MessageDestination getDestination()
     {
-        return _exchange;
+        return _destination;
     }
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1574582&r1=1574581&r2=1574582&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
(original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
Wed Mar  5 17:33:23 2014
@@ -230,7 +230,7 @@ public class SendingLink_1_0 implements 
                 }
 
 
-                String binding = "";
+                String binding = null;
 
                 Map<Symbol,Filter> filters = source.getFilter();
                 Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
@@ -298,8 +298,14 @@ public class SendingLink_1_0 implements 
                 }
                 _queue = queue;
                 source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
-
-                exchange.addBinding(binding, queue,null);
+                if(binding != null)
+                {
+                    exchange.addBinding(binding, queue,null);
+                }
+                if(exchangeDestination.getInitialRoutingAddress() != null)
+                {
+                    exchange.addBinding(exchangeDestination.getInitialRoutingAddress(),queue,null);
+                }
                 source.setDistributionMode(StdDistMode.COPY);
 
                 qd = new QueueDestination(queue);

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1574582&r1=1574581&r2=1574582&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
(original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Wed Mar  5 17:33:23 2014
@@ -127,17 +127,17 @@ public class Session_1_0 implements Sess
                         source.setAddress(tempQueue.getName());
                     }
                     String addr = source.getAddress();
-                    MessageSource queue = getVirtualHost().getMessageSource(addr);
-                    if(queue != null)
+                    if(!addr.startsWith("/") && addr.contains("/"))
                     {
-                        destination = new MessageSourceDestination(queue);
-                    }
-                    else
-                    {
-                        ExchangeImpl exchg = getVirtualHost().getExchange(addr);
+                        String[] parts = addr.split("/",2);
+                        ExchangeImpl exchg = getVirtualHost().getExchange(parts[0]);
                         if(exchg != null)
                         {
-                            destination = new ExchangeDestination(exchg, source.getDurable(),
source.getExpiryPolicy());
+                            ExchangeDestination exchangeDestination =
+                                    new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy());
+                            exchangeDestination.setInitialRoutingAddress(parts[1]);
+                            destination = exchangeDestination;
+
                         }
                         else
                         {
@@ -145,6 +145,27 @@ public class Session_1_0 implements Sess
                             destination = null;
                         }
                     }
+                    else
+                    {
+                        MessageSource queue = getVirtualHost().getMessageSource(addr);
+                        if(queue != null)
+                        {
+                            destination = new MessageSourceDestination(queue);
+                        }
+                        else
+                        {
+                            ExchangeImpl exchg = getVirtualHost().getExchange(addr);
+                            if(exchg != null)
+                            {
+                                destination = new ExchangeDestination(exchg, source.getDurable(),
source.getExpiryPolicy());
+                            }
+                            else
+                            {
+                                endpoint.setSource(null);
+                                destination = null;
+                            }
+                        }
+                    }
 
                 }
                 else
@@ -265,28 +286,52 @@ public class Session_1_0 implements Sess
                         }
 
                         String addr = target.getAddress();
-                        MessageDestination messageDestination = getVirtualHost().getMessageDestination(addr);
-                        if(messageDestination != null)
-                        {
-                            destination = new NodeReceivingDestination(messageDestination,
target.getDurable(),
-                                                                       target.getExpiryPolicy());
-                        }
-                        else
+                        if(!addr.startsWith("/") && addr.contains("/"))
                         {
-                            AMQQueue queue = getVirtualHost().getQueue(addr);
-                            if(queue != null)
+                            String[] parts = addr.split("/",2);
+                            ExchangeImpl exchange = getVirtualHost().getExchange(parts[0]);
+                            if(exchange != null)
                             {
+                                ExchangeDestination exchangeDestination =
+                                        new ExchangeDestination(exchange,
+                                                                target.getDurable(),
+                                                                target.getExpiryPolicy());
+
+                                exchangeDestination.setInitialRoutingAddress(parts[1]);
+
+                                destination = exchangeDestination;
 
-                                destination = new QueueDestination(queue);
                             }
                             else
                             {
                                 endpoint.setTarget(null);
                                 destination = null;
                             }
-
                         }
+                        else
+                        {
+                            MessageDestination messageDestination = getVirtualHost().getMessageDestination(addr);
+                            if(messageDestination != null)
+                            {
+                                destination = new NodeReceivingDestination(messageDestination,
target.getDurable(),
+                                                                           target.getExpiryPolicy());
+                            }
+                            else
+                            {
+                                AMQQueue queue = getVirtualHost().getQueue(addr);
+                                if(queue != null)
+                                {
+
+                                    destination = new QueueDestination(queue);
+                                }
+                                else
+                                {
+                                    endpoint.setTarget(null);
+                                    destination = null;
+                                }
 
+                            }
+                        }
 
                     }
                     else



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message