activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [2/2] git commit: https://issues.apache.org/jira/browse/AMQ-5402
Date Mon, 20 Oct 2014 13:56:02 GMT
https://issues.apache.org/jira/browse/AMQ-5402

Add support for encoding the destination type in transformed messages as
a byte value to supoort future JMS->AMQP spec mappings.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4881a848
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4881a848
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4881a848

Branch: refs/heads/trunk
Commit: 4881a848dc9c8170ab82267a6bdedd4d3adcc372
Parents: 5a6129b
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Oct 20 09:49:36 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Oct 20 09:49:36 2014 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java    | 19 +++++++++++++++++++
 .../activemq/transport/amqp/AmqpWireFormat.java  |  9 +++++++++
 2 files changed, 28 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4881a848/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 1f8bb36..80b47cc 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -118,12 +118,14 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
     private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
     private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("x-opt-anonymous-relay");
+    private static final Symbol JMS_MAPPING_VERSION = Symbol.valueOf("x-opt-jms-mapping-version");
     private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
 
     protected int prefetch;
     protected Transport protonTransport = Proton.transport();
     protected Connection protonConnection = Proton.connection();
     protected Collector eventCollector = new CollectorImpl();
+    protected boolean useByteDestinationTypeAnnotation;
 
     public AmqpProtocolConverter(AmqpTransport transport) {
         this.amqpTransport = transport;
@@ -134,6 +136,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             this.protonTransport.setMaxFrameSize(maxFrameSize);
         }
 
+        useByteDestinationTypeAnnotation = transport.getWireFormat().isUseByteDestinationTypeAnnotation();
+
         this.protonTransport.bind(this.protonConnection);
 
         // NOTE: QPid JMS client has a bug where the channel max is stored as a
@@ -456,6 +460,17 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             connectionInfo.setClientId(clientId);
         }
 
+        Map<Symbol, Object> props = protonConnection.getRemoteProperties();
+        if (props != null) {
+            if (props.containsKey(JMS_MAPPING_VERSION)) {
+                useByteDestinationTypeAnnotation = true;
+            }
+        }
+
+        if (useByteDestinationTypeAnnotation) {
+            outboundTransformer.setUseByteDestinationTypeAnnotations(true);
+        }
+
         connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
 
         sendToActiveMQ(connectionInfo, new ResponseHandler() {
@@ -529,6 +544,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 LOG.warn("Unknown transformer type {} using native one instead", transformer);
                 inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
             }
+
+            if (useByteDestinationTypeAnnotation) {
+                inboundTransformer.setUseByteDestinationTypeAnnotations(true);
+            }
         }
         return inboundTransformer;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/4881a848/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
index f6c2880..b58273d 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -41,6 +41,7 @@ public class AmqpWireFormat implements WireFormat {
     private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
     private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
     private String anonymousNodeName = "$relay";
+    private boolean useByteDestinationTypeAnnotation = false;
 
     @Override
     public ByteSequence marshal(Object command) throws IOException {
@@ -135,4 +136,12 @@ public class AmqpWireFormat implements WireFormat {
     public void setAnonymousNodeName(String anonymousNodeName) {
         this.anonymousNodeName = anonymousNodeName;
     }
+
+    public boolean isUseByteDestinationTypeAnnotation() {
+        return useByteDestinationTypeAnnotation;
+    }
+
+    public void setUseByteDestinationTypeAnnotation(boolean useByteDestinationTypeAnnotation)
{
+        this.useByteDestinationTypeAnnotation = useByteDestinationTypeAnnotation;
+    }
 }


Mime
View raw message