activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1438540 - in /activemq/trunk: activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ activemq-client/src/main/java/org/apache/activemq/command/
Date Fri, 25 Jan 2013 14:35:18 GMT
Author: chirino
Date: Fri Jan 25 14:35:18 2013
New Revision: 1438540

URL: http://svn.apache.org/viewvc?rev=1438540&view=rev
Log:
When the AMQP source or target address is not set, close the sender/receiver and report the
error.


Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1438540&r1=1438539&r2=1438540&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
(original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Fri Jan 25 14:35:18 2013
@@ -581,58 +581,68 @@ class AmqpProtocolConverter {
     void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
         // Client is producing to this receiver object
         org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
-        if( remoteTarget instanceof Coordinator ) {
-            pumpProtonToSocket();
-            receiver.setContext(coordinatorContext);
-            receiver.flow(prefetch);
-            receiver.open();
-            pumpProtonToSocket();
-        } else {
-            org.apache.qpid.proton.amqp.messaging.Target target = (Target) remoteTarget;
-            ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
-            ActiveMQDestination dest;
-            if( target.getDynamic() ) {
-                dest = createTempQueue();
-                org.apache.qpid.proton.amqp.messaging.Target actualTarget = new org.apache.qpid.proton.amqp.messaging.Target();
-                actualTarget.setAddress(dest.getQualifiedName());
-                actualTarget.setDynamic(true);
-                receiver.setTarget(actualTarget);
+        try {
+            if( remoteTarget instanceof Coordinator ) {
+                pumpProtonToSocket();
+                receiver.setContext(coordinatorContext);
+                receiver.flow(prefetch);
+                receiver.open();
+                pumpProtonToSocket();
             } else {
-                dest = createDestination(remoteTarget);
-            }
+                Target target = (Target) remoteTarget;
+                ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
+                ActiveMQDestination dest;
+                if( target.getDynamic() ) {
+                    dest = createTempQueue();
+                    Target actualTarget = new Target();
+                    actualTarget.setAddress(dest.getQualifiedName());
+                    actualTarget.setDynamic(true);
+                    receiver.setTarget(actualTarget);
+                } else {
+                    dest = createDestination(remoteTarget);
+                }
 
-            ProducerContext producerContext = new ProducerContext(producerId, dest);
+                ProducerContext producerContext = new ProducerContext(producerId, dest);
 
-            receiver.setContext(producerContext);
-            receiver.flow(prefetch);
-            ProducerInfo producerInfo = new ProducerInfo(producerId);
-            producerInfo.setDestination(dest);
-            sendToActiveMQ(producerInfo, new ResponseHandler() {
-                public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
-                    if (response.isException()) {
-                        receiver.setTarget(null);
-                        Throwable exception = ((ExceptionResponse) response).getException();
-                        ((LinkImpl)receiver).setLocalError(new EndpointError(exception.getClass().getName(),
exception.getMessage()));
-                        receiver.close();
-                    } else {
-                        receiver.open();
+                receiver.setContext(producerContext);
+                receiver.flow(prefetch);
+                ProducerInfo producerInfo = new ProducerInfo(producerId);
+                producerInfo.setDestination(dest);
+                sendToActiveMQ(producerInfo, new ResponseHandler() {
+                    public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+                        if (response.isException()) {
+                            receiver.setTarget(null);
+                            Throwable exception = ((ExceptionResponse) response).getException();
+                            ((LinkImpl)receiver).setLocalError(new EndpointError(exception.getClass().getName(),
exception.getMessage()));
+                            receiver.close();
+                        } else {
+                            receiver.open();
+                        }
+                        pumpProtonToSocket();
                     }
-                    pumpProtonToSocket();
-                }
-            });
+                });
+            }
+        } catch (AmqpProtocolException exception) {
+            receiver.setTarget(null);
+            ((LinkImpl)receiver).setLocalError(new EndpointError(exception.getSymbolicName(),
exception.getMessage()));
+            receiver.close();
         }
-
-
     }
 
-    private ActiveMQDestination createDestination(Object terminus) {
+    private ActiveMQDestination createDestination(Object terminus) throws AmqpProtocolException
{
         if( terminus == null ) {
             return null;
         } else if( terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
             org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source)terminus;
+            if( source.getAddress() == null ) {
+                throw new AmqpProtocolException("amqp:invalid-field", "source address not
set");
+            }
             return ActiveMQDestination.createDestination(source.getAddress(), ActiveMQDestination.QUEUE_TYPE);
         } else if( terminus instanceof org.apache.qpid.proton.amqp.messaging.Target) {
             org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target)terminus;
+            if( target.getAddress() == null ) {
+                throw new AmqpProtocolException("amqp:invalid-field", "target address not
set");
+            }
             return ActiveMQDestination.createDestination(target.getAddress(), ActiveMQDestination.QUEUE_TYPE);
         } else if( terminus instanceof Coordinator ) {
             Coordinator target = (Coordinator)terminus;
@@ -854,116 +864,121 @@ class AmqpProtocolConverter {
     void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
         org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source)sender.getRemoteSource();
 
-        final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
-        ConsumerContext consumerContext = new ConsumerContext(id, sender);
-        sender.setContext(consumerContext);
+        try {
+            final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
+            ConsumerContext consumerContext = new ConsumerContext(id, sender);
+            sender.setContext(consumerContext);
+
+            String selector = null;
+            if( source!=null ) {
+                Map filter = source.getFilter();
+                if (filter != null) {
+                    DescribedType value = (DescribedType)filter.get(JMS_SELECTOR);
+                    if( value!=null ) {
+                        selector = value.getDescribed().toString();
+                        // Validate the Selector.
+                        try {
+                            SelectorParser.parse(selector);
+                        } catch (InvalidSelectorException e) {
+                            sender.setSource(null);
+                            ((LinkImpl)sender).setLocalError(new EndpointError("amqp:invalid-field",
e.getMessage()));
+                            sender.close();
+                            consumerContext.closed = true;
+                            return;
+                        }
+                    }
+                }
+            }
+
+            ActiveMQDestination dest;
+            if( source == null ) {
+
+                source = new org.apache.qpid.proton.amqp.messaging.Source();
+                source.setAddress("");
+                source.setCapabilities(DURABLE_SUBSCRIPTION_ENDED);
+                sender.setSource(source);
+
+                // Looks like durable sub removal.
+                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+                rsi.setConnectionId(connectionId);
+                rsi.setSubscriptionName(sender.getName());
+                rsi.setClientId(connectionInfo.getClientId());
+
+                consumerContext.closed=true;
+                sendToActiveMQ(rsi, new ResponseHandler() {
+                    public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+                        if (response.isException()) {
+                            sender.setSource(null);
+                            Throwable exception = ((ExceptionResponse) response).getException();
+                            String name = exception.getClass().getName();
+                            ((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
+                        }
+                        sender.open();
+                        pumpProtonToSocket();
+                    }
+                });
+                return;
+            } else if( contains(source.getCapabilities(), DURABLE_SUBSCRIPTION_ENDED) ) {
+                consumerContext.closed=true;
+                sender.close();
+                pumpProtonToSocket();
+                return;
+            } else if( source.getDynamic() ) {
+                // lets create a temp dest.
+                dest = createTempQueue();
+                source = new org.apache.qpid.proton.amqp.messaging.Source();
+                source.setAddress(dest.getQualifiedName());
+                source.setDynamic(true);
+                sender.setSource(source);
+            } else {
+                dest = createDestination(source);
+            }
+
+            subscriptionsByConsumerId.put(id, consumerContext);
+            ConsumerInfo consumerInfo = new ConsumerInfo(id);
+            consumerInfo.setSelector(selector);
+            consumerInfo.setNoRangeAcks(true);
+            consumerInfo.setDestination(dest);
+            consumerInfo.setPrefetchSize(100);
+            consumerInfo.setDispatchAsync(true);
+            if( source.getDistributionMode() == COPY && dest.isQueue() ) {
+                consumerInfo.setBrowser(true);
+            }
+            if( DURABLE.equals(source.getDurable()) && dest.isTopic() ) {
+                consumerInfo.setSubscriptionName(sender.getName());
+            }
 
-        String selector = null;
-        if( source!=null ) {
             Map filter = source.getFilter();
             if (filter != null) {
-                DescribedType value = (DescribedType)filter.get(JMS_SELECTOR);
+                DescribedType value = (DescribedType)filter.get(NO_LOCAL);
                 if( value!=null ) {
-                    selector = value.getDescribed().toString();
-                    // Validate the Selector.
-                    try {
-                        SelectorParser.parse(selector);
-                    } catch (InvalidSelectorException e) {
-                        sender.setSource(null);
-                        ((LinkImpl)sender).setLocalError(new EndpointError("amqp:invalid-field",
e.getMessage()));
-                        sender.close();
-                        consumerContext.closed = true;
-                        return;
-                    }
+                    consumerInfo.setNoLocal(true);
                 }
             }
-        }
-
-        ActiveMQDestination dest;
-        if( source == null ) {
-
-            source = new org.apache.qpid.proton.amqp.messaging.Source();
-            source.setAddress("");
-            source.setCapabilities(DURABLE_SUBSCRIPTION_ENDED);
-            sender.setSource(source);
-
-            // Looks like durable sub removal.
-            RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
-            rsi.setConnectionId(connectionId);
-            rsi.setSubscriptionName(sender.getName());
-            rsi.setClientId(connectionInfo.getClientId());
 
-            consumerContext.closed=true;
-            sendToActiveMQ(rsi, new ResponseHandler() {
+            sendToActiveMQ(consumerInfo, new ResponseHandler() {
                 public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
                     if (response.isException()) {
                         sender.setSource(null);
                         Throwable exception = ((ExceptionResponse) response).getException();
                         String name = exception.getClass().getName();
+                        if( exception instanceof InvalidSelectorException ) {
+                            name = "amqp:invalid-field";
+                        }
                         ((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
+                        subscriptionsByConsumerId.remove(id);
+                        sender.close();
+                    } else {
+                        sender.open();
                     }
-                    sender.open();
                     pumpProtonToSocket();
                 }
             });
-            return;
-        } else if( contains(source.getCapabilities(), DURABLE_SUBSCRIPTION_ENDED) ) {
-            consumerContext.closed=true;
+        } catch (AmqpProtocolException e) {
+            sender.setSource(null);
+            ((LinkImpl)sender).setLocalError(new EndpointError(e.getSymbolicName(), e.getMessage()));
             sender.close();
-            pumpProtonToSocket();
-            return;
-        } else if( source.getDynamic() ) {
-            // lets create a temp dest.
-            dest = createTempQueue();
-            source = new org.apache.qpid.proton.amqp.messaging.Source();
-            source.setAddress(dest.getQualifiedName());
-            source.setDynamic(true);
-            sender.setSource(source);
-        } else {
-            dest = createDestination(source);
         }
-
-        subscriptionsByConsumerId.put(id, consumerContext);
-        ConsumerInfo consumerInfo = new ConsumerInfo(id);
-        consumerInfo.setSelector(selector);
-        consumerInfo.setNoRangeAcks(true);
-        consumerInfo.setDestination(dest);
-        consumerInfo.setPrefetchSize(100);
-        consumerInfo.setDispatchAsync(true);
-        if( source.getDistributionMode() == COPY && dest.isQueue() ) {
-            consumerInfo.setBrowser(true);
-        }
-        if( DURABLE.equals(source.getDurable()) && dest.isTopic() ) {
-            consumerInfo.setSubscriptionName(sender.getName());
-        }
-
-        Map filter = source.getFilter();
-        if (filter != null) {
-            DescribedType value = (DescribedType)filter.get(NO_LOCAL);
-            if( value!=null ) {
-                consumerInfo.setNoLocal(true);
-            }
-        }
-
-        sendToActiveMQ(consumerInfo, new ResponseHandler() {
-            public void onResponse(AmqpProtocolConverter converter, Response response) throws
IOException {
-                if (response.isException()) {
-                    sender.setSource(null);
-                    Throwable exception = ((ExceptionResponse) response).getException();
-                    String name = exception.getClass().getName();
-                    if( exception instanceof InvalidSelectorException ) {
-                        name = "amqp:invalid-field";
-                    }
-                    ((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
-                    subscriptionsByConsumerId.remove(id);
-                    sender.close();
-                } else {
-                    sender.open();
-                }
-                pumpProtonToSocket();
-            }
-        });
-
     }
 
     static private boolean contains(Symbol[] haystack, Symbol needle) {

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java?rev=1438540&r1=1438539&r2=1438540&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
(original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
Fri Jan 25 14:35:18 2013
@@ -23,6 +23,7 @@ public class AmqpProtocolException exten
 
     private static final long serialVersionUID = -2869735532997332242L;
 
+    private final String symbolicName;
     private final boolean fatal;
 
     public AmqpProtocolException() {
@@ -37,8 +38,17 @@ public class AmqpProtocolException exten
         this(s, fatal, null);
     }
 
+    public AmqpProtocolException(String s, String msg) {
+        this(s, msg, false, null);
+    }
+
     public AmqpProtocolException(String s, boolean fatal, Throwable cause) {
+        this("error", s, fatal, cause);
+    }
+
+    public AmqpProtocolException(String symbolicName, String s, boolean fatal, Throwable
cause) {
         super(s);
+        this.symbolicName = symbolicName;
         this.fatal = fatal;
         initCause(cause);
     }
@@ -47,4 +57,7 @@ public class AmqpProtocolException exten
         return fatal;
     }
 
+    public String getSymbolicName() {
+        return symbolicName;
+    }
 }

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java?rev=1438540&r1=1438539&r2=1438540&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
(original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
Fri Jan 25 14:35:18 2013
@@ -89,7 +89,6 @@ public abstract class ActiveMQDestinatio
     // static helper methods for working with destinations
     // -------------------------------------------------------------------------
     public static ActiveMQDestination createDestination(String name, byte defaultType) {
-
         if (name.startsWith(QUEUE_QUALIFIED_PREFIX)) {
             return new ActiveMQQueue(name.substring(QUEUE_QUALIFIED_PREFIX.length()));
         } else if (name.startsWith(TOPIC_QUALIFIED_PREFIX)) {



Mime
View raw message