activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1421487 - /activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Date Thu, 13 Dec 2012 20:20:53 GMT
Author: chirino
Date: Thu Dec 13 20:20:53 2012
New Revision: 1421487

URL: http://svn.apache.org/viewvc?rev=1421487&view=rev
Log:
Convert to new proton APIs.

Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1421487&r1=1421486&r2=1421487&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
(original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Thu Dec 13 20:20:53 2012
@@ -18,25 +18,27 @@ package org.apache.activemq.transport.am
 
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.command.*;
+import org.apache.activemq.command.ConnectionError;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.qpid.proton.amqp.*;
+import org.apache.qpid.proton.amqp.messaging.*;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transaction.*;
+import org.apache.qpid.proton.amqp.transport.*;
 import org.apache.qpid.proton.engine.*;
-import org.apache.qpid.proton.engine.impl.*;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.LinkImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
 import org.apache.qpid.proton.framing.TransportFrame;
 import org.apache.qpid.proton.jms.*;
-import org.apache.qpid.proton.type.Binary;
-import org.apache.qpid.proton.type.DescribedType;
-import org.apache.qpid.proton.type.Symbol;
-import org.apache.qpid.proton.type.UnsignedInteger;
-import org.apache.qpid.proton.type.messaging.*;
-import org.apache.qpid.proton.type.messaging.Modified;
-import org.apache.qpid.proton.type.messaging.Rejected;
-import org.apache.qpid.proton.type.messaging.Released;
-import org.apache.qpid.proton.type.transaction.*;
-import org.apache.qpid.proton.type.transport.DeliveryState;
-import org.apache.qpid.proton.type.transport.SenderSettleMode;
+import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.ByteArrayOutputStream;
 import org.slf4j.Logger;
@@ -472,9 +474,10 @@ class AmqpProtocolConverter {
                         if( response.isException() ) {
                             ExceptionResponse er = (ExceptionResponse)response;
                             Rejected rejected = new Rejected();
-                            ArrayList errors = new ArrayList();
-                            errors.add(er.getException().getMessage());
-                            rejected.setError(errors);
+                            ErrorCondition condition = new ErrorCondition();
+                            condition.setCondition(Symbol.valueOf("failed"));
+                            condition.setDescription(er.getException().getMessage());
+                            rejected.setError(condition);
                             delivery.disposition(rejected);
                         }
                     }
@@ -509,8 +512,7 @@ class AmqpProtocolConverter {
         @Override
         protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer)
throws Exception {
 
-            org.apache.qpid.proton.message.Message msg = new org.apache.qpid.proton.message.Message();
-
+            MessageImpl msg = new MessageImpl();
             int offset = buffer.offset;
             int len = buffer.length;
             while( len > 0 ) {
@@ -557,9 +559,7 @@ class AmqpProtocolConverter {
                         if( response.isException() ) {
                             ExceptionResponse er = (ExceptionResponse)response;
                             Rejected rejected = new Rejected();
-                            ArrayList errors = new ArrayList();
-                            errors.add(er.getException().getMessage());
-                            rejected.setError(errors);
+                            rejected.setError(createErrorCondition("failed", er.getException().getMessage()));
                             delivery.disposition(rejected);
                         }
                         delivery.settle();
@@ -577,7 +577,7 @@ class AmqpProtocolConverter {
 
     void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
         // Client is producing to this receiver object
-        org.apache.qpid.proton.type.transport.Target remoteTarget = receiver.getRemoteTarget();
+        org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
         if( remoteTarget instanceof Coordinator ) {
             pumpProtonToSocket();
             receiver.setContext(coordinatorContext);
@@ -585,12 +585,12 @@ class AmqpProtocolConverter {
             receiver.open();
             pumpProtonToSocket();
         } else {
-            org.apache.qpid.proton.type.messaging.Target target = (Target) remoteTarget;
+            org.apache.qpid.proton.amqp.messaging.Target target = (Target) remoteTarget;
             ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
             ActiveMQDestination dest;
             if( target.getDynamic() ) {
                 dest = createTempQueue();
-                org.apache.qpid.proton.type.messaging.Target actualTarget = new org.apache.qpid.proton.type.messaging.Target();
+                org.apache.qpid.proton.amqp.messaging.Target actualTarget = new org.apache.qpid.proton.amqp.messaging.Target();
                 actualTarget.setAddress(dest.getQualifiedName());
                 actualTarget.setDynamic(true);
                 receiver.setTarget(actualTarget);
@@ -625,11 +625,11 @@ class AmqpProtocolConverter {
     private ActiveMQDestination createDestination(Object terminus) {
         if( terminus == null ) {
             return null;
-        } else if( terminus instanceof org.apache.qpid.proton.type.messaging.Source) {
-            org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)terminus;
+        } else if( terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
+            org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source)terminus;
             return ActiveMQDestination.createDestination(source.getAddress(), ActiveMQDestination.QUEUE_TYPE);
-        } else if( terminus instanceof org.apache.qpid.proton.type.messaging.Target) {
-            org.apache.qpid.proton.type.messaging.Target target = (org.apache.qpid.proton.type.messaging.Target)terminus;
+        } else if( terminus instanceof org.apache.qpid.proton.amqp.messaging.Target) {
+            org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target)terminus;
             return ActiveMQDestination.createDestination(target.getAddress(), ActiveMQDestination.QUEUE_TYPE);
         } else if( terminus instanceof Coordinator ) {
             Coordinator target = (Coordinator)terminus;
@@ -849,7 +849,7 @@ class AmqpProtocolConverter {
     private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId
= new ConcurrentHashMap<ConsumerId, ConsumerContext>();
 
     void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
-        org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)sender.getRemoteSource();
+        org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source)sender.getRemoteSource();
 
         final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
         ConsumerContext consumerContext = new ConsumerContext(id, sender);
@@ -879,7 +879,7 @@ class AmqpProtocolConverter {
         ActiveMQDestination dest;
         if( source == null ) {
 
-            source = new org.apache.qpid.proton.type.messaging.Source();
+            source = new org.apache.qpid.proton.amqp.messaging.Source();
             source.setAddress("");
             source.setCapabilities(DURABLE_SUBSCRIPTION_ENDED);
             sender.setSource(source);
@@ -912,7 +912,7 @@ class AmqpProtocolConverter {
         } else if( source.getDynamic() ) {
             // lets create a temp dest.
             dest = createTempQueue();
-            source = new org.apache.qpid.proton.type.messaging.Source();
+            source = new org.apache.qpid.proton.amqp.messaging.Source();
             source.setAddress(dest.getQualifiedName());
             source.setDynamic(true);
             sender.setSource(source);
@@ -1023,4 +1023,15 @@ class AmqpProtocolConverter {
         }
     }
 
+    ErrorCondition createErrorCondition(String name) {
+        return createErrorCondition(name, "");
+    }
+
+    ErrorCondition createErrorCondition(String name, String description) {
+        ErrorCondition condition = new ErrorCondition();
+        condition.setCondition(Symbol.valueOf(name));
+        condition.setDescription(description);
+        return condition;
+    }
+
 }



Mime
View raw message