Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 510F1200A01 for ; Tue, 3 May 2016 16:42:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 504A11609F4; Tue, 3 May 2016 16:42:30 +0200 (CEST) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 08C241609A9 for ; Tue, 3 May 2016 16:42:27 +0200 (CEST) Received: (qmail 48286 invoked by uid 500); 3 May 2016 14:42:27 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 48272 invoked by uid 99); 3 May 2016 14:42:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 May 2016 14:42:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 148B1DFBD6; Tue, 3 May 2016 14:42:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 03 May 2016 14:42:27 -0000 Message-Id: <8c8bcc94d45443d69865040710e5bb14@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] activemq-artemis git commit: ARTEMIS-503 - replace proton-jms with proton-jms from ActiveMQ archived-at: Tue, 03 May 2016 14:42:30 -0000 Repository: activemq-artemis Updated Branches: refs/heads/master 7fb603f78 -> 2a415a80e ARTEMIS-503 - replace proton-jms with proton-jms from ActiveMQ Ive copied over the source itself https://issues.apache.org/jira/browse/ARTEMIS-503 Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c161ab46 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c161ab46 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c161ab46 Branch: refs/heads/master Commit: c161ab46a607b2bddc9c7d637c950ae7b2ad1cfc Parents: 7fb603f Author: Andy Taylor Authored: Tue May 3 13:51:15 2016 +0100 Committer: Clebert Suconic Committed: Tue May 3 10:40:56 2016 -0400 ---------------------------------------------------------------------- artemis-distribution/pom.xml | 4 - artemis-distribution/src/main/assembly/dep.xml | 1 - artemis-protocols/artemis-amqp-protocol/pom.xml | 4 - .../AMQPNativeOutboundTransformer.java | 56 ---- .../proton/converter/ActiveMQJMSVendor.java | 2 +- .../converter/JMSMappingInboundTransformer.java | 49 --- .../JMSMappingOutboundTransformer.java | 53 --- .../converter/ProtonMessageConverter.java | 7 +- .../proton/converter/jms/ServerJMSMessage.java | 7 + .../converter/jms/ServerJMSObjectMessage.java | 2 +- .../converter/message/AMQPMessageIdHelper.java | 257 +++++++++++++++ .../message/AMQPNativeInboundTransformer.java | 46 +++ .../message/AMQPNativeOutboundTransformer.java | 60 ++++ .../message/AMQPRawInboundTransformer.java | 60 ++++ .../converter/message/EncodedMessage.java | 67 ++++ .../converter/message/InboundTransformer.java | 317 ++++++++++++++++++ .../message/JMSMappingInboundTransformer.java | 126 +++++++ .../message/JMSMappingOutboundTransformer.java | 329 +++++++++++++++++++ .../proton/converter/message/JMSVendor.java | 53 +++ .../converter/message/OutboundTransformer.java | 69 ++++ .../plug/ProtonSessionIntegrationCallback.java | 2 +- .../core/protocol/proton/TestConversions.java | 2 +- pom.xml | 12 - 23 files changed, 1400 insertions(+), 185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-distribution/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml index 6f462b4..54cf4a0 100644 --- a/artemis-distribution/pom.xml +++ b/artemis-distribution/pom.xml @@ -186,10 +186,6 @@ io.netty netty-codec-mqtt - - org.apache.activemq - activemq-amqp - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-distribution/src/main/assembly/dep.xml ---------------------------------------------------------------------- diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml index d723bae..4a0f2da 100644 --- a/artemis-distribution/src/main/assembly/dep.xml +++ b/artemis-distribution/src/main/assembly/dep.xml @@ -80,7 +80,6 @@ org.jboss.logging:jboss-logging io.netty:netty-all org.apache.qpid:proton-j - org.apache.activemq:activemq-amqp org.apache.activemq:activemq-client org.slf4j:slf4j-api io.airlift:airline http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/pom.xml b/artemis-protocols/artemis-amqp-protocol/pom.xml index 78e9c3b..3f130ee 100644 --- a/artemis-protocols/artemis-amqp-protocol/pom.xml +++ b/artemis-protocols/artemis-amqp-protocol/pom.xml @@ -42,10 +42,6 @@ ${project.version} - org.apache.activemq - activemq-amqp - - org.jboss.logging jboss-logging-processor provided http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java deleted file mode 100644 index c187ad0..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.proton.converter; - -import org.apache.activemq.transport.amqp.message.OutboundTransformer; -import org.apache.qpid.proton.amqp.UnsignedInteger; -import org.apache.qpid.proton.amqp.messaging.Header; -import org.apache.qpid.proton.message.ProtonJMessage; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; - -public class AMQPNativeOutboundTransformer { - static ProtonJMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException { - byte[] data = new byte[(int) msg.getBodyLength()]; - msg.readBytes(data); - msg.reset(); - int count = msg.getIntProperty("JMSXDeliveryCount"); - - // decode... - ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(); - int offset = 0; - int len = data.length; - while (len > 0) { - final int decoded = amqp.decode(data, offset, len); - assert decoded > 0 : "Make progress decoding the message"; - offset += decoded; - len -= decoded; - } - - // Update the DeliveryCount header... - // The AMQP delivery-count field only includes prior failed delivery attempts, - // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1. - if (amqp.getHeader() == null) { - amqp.setHeader(new Header()); - } - - amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1)); - - return amqp; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java index 3af26dc..59e0edb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java @@ -26,6 +26,7 @@ import javax.jms.TextMessage; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSObjectMessage; +import org.apache.activemq.artemis.core.protocol.proton.converter.message.JMSVendor; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSBytesMessage; @@ -36,7 +37,6 @@ import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMST import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.activemq.transport.amqp.message.JMSVendor; public class ActiveMQJMSVendor implements JMSVendor { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingInboundTransformer.java deleted file mode 100644 index 03f9104..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingInboundTransformer.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.proton.converter; - -import org.apache.activemq.transport.amqp.message.JMSVendor; -import org.apache.qpid.proton.amqp.UnsignedLong; -import org.apache.qpid.proton.amqp.messaging.Properties; - -import javax.jms.Message; - -class JMSMappingInboundTransformer extends org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer { - - JMSMappingInboundTransformer(JMSVendor vendor) { - super(vendor); - } - - @Override - protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception { - super.populateMessage(jms, amqp); - final Properties properties = amqp.getProperties(); - if (properties != null) { - if (properties.getMessageId() != null) { - if (properties.getMessageId() instanceof Long) { - jms.setLongProperty(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID", (Long) properties.getMessageId()); - } - else if (properties.getMessageId() instanceof UnsignedLong) { - jms.setLongProperty(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID", ((UnsignedLong) properties.getMessageId()).longValue()); - } - else { - jms.setStringProperty(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID", properties.getMessageId().toString()); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingOutboundTransformer.java deleted file mode 100644 index b643162..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/JMSMappingOutboundTransformer.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.proton.converter; - -import org.apache.activemq.transport.amqp.message.JMSVendor; -import org.apache.qpid.proton.amqp.UnsignedLong; -import org.apache.qpid.proton.message.ProtonJMessage; - -import javax.jms.JMSException; -import javax.jms.Message; -import java.io.UnsupportedEncodingException; -import java.util.Map; - -class JMSMappingOutboundTransformer extends org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer { - JMSMappingOutboundTransformer(JMSVendor vendor) { - super(vendor); - } - - @Override - public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException { - ProtonJMessage protonJMessage = super.convert(msg); - - Map properties = protonJMessage.getApplicationProperties().getValue(); - - if (properties.containsKey(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID")) { - Long id = (Long) properties.remove(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID"); - protonJMessage.setMessageId(id); - } - else if (properties.containsKey(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID")) { - Long id = (Long) properties.remove(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID"); - protonJMessage.setMessageId(new UnsignedLong(id)); - } - else if (properties.containsKey(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID")) { - String id = (String) properties.remove(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID"); - protonJMessage.setMessageId(id); - } - return protonJMessage; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java index 47011c1..6b4e99b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java @@ -17,9 +17,12 @@ package org.apache.activemq.artemis.core.protocol.proton.converter; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; -import org.apache.activemq.transport.amqp.message.EncodedMessage; -import org.apache.activemq.transport.amqp.message.InboundTransformer; +import org.apache.activemq.artemis.core.protocol.proton.converter.message.AMQPNativeOutboundTransformer; +import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage; +import org.apache.activemq.artemis.core.protocol.proton.converter.message.InboundTransformer; +import org.apache.activemq.artemis.core.protocol.proton.converter.message.JMSMappingInboundTransformer; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.core.protocol.proton.converter.message.JMSMappingOutboundTransformer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.utils.IDGenerator; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java index 8f6ef9b..0d82236 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.reader.MessageUtil; public class ServerJMSMessage implements Message { protected final MessageInternal message; + private final String NATIVE_MESSAGE_ID = "NATIVE_MESSAGE_ID"; protected int deliveryCount; @@ -65,11 +66,17 @@ public class ServerJMSMessage implements Message { @Override public final String getJMSMessageID() throws JMSException { + if (message.containsProperty(NATIVE_MESSAGE_ID)) { + return getStringProperty(NATIVE_MESSAGE_ID); + } return null; } @Override public final void setJMSMessageID(String id) throws JMSException { + if (id != null) { + message.putStringProperty(NATIVE_MESSAGE_ID, id); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java index 938f459..d5dbbe8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java @@ -18,11 +18,11 @@ package org.apache.activemq.artemis.core.protocol.proton.converter.jms; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.message.impl.MessageInternal; -import org.apache.activemq.util.ByteArrayOutputStream; import javax.jms.JMSException; import javax.jms.ObjectMessage; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPMessageIdHelper.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPMessageIdHelper.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPMessageIdHelper.java new file mode 100644 index 0000000..479c1f7 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPMessageIdHelper.java @@ -0,0 +1,257 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.core.protocol.proton.converter.message; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.UnsignedLong; +import org.proton.plug.exceptions.ActiveMQAMQPIllegalStateException; + +import java.nio.ByteBuffer; +import java.util.UUID; + +/** + * Helper class for identifying and converting message-id and correlation-id values between + * the AMQP types and the Strings values used by JMS. + *

+ *

AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, message-id-binary, + * message-id-uuid, or message-id-ulong. In order to accept or return a string representation of these + * for interoperability with other AMQP clients, the following encoding can be used after removing or + * before adding the "ID:" prefix used for a JMSMessageID value:
+ *

+ * {@literal "AMQP_BINARY:"}
+ * {@literal "AMQP_UUID:"}
+ * {@literal "AMQP_ULONG:"}
+ * {@literal "AMQP_STRING:"}
+ *

+ *

The AMQP_STRING encoding exists only for escaping message-id-string values that happen to begin + * with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used otherwise. + *

+ *

When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or + * ulong but can't be converted into the indicated format, an exception will be thrown. + */ +public class AMQPMessageIdHelper { + + public static final AMQPMessageIdHelper INSTANCE = new AMQPMessageIdHelper(); + + public static final String AMQP_STRING_PREFIX = "AMQP_STRING:"; + public static final String AMQP_UUID_PREFIX = "AMQP_UUID:"; + public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:"; + public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:"; + + private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length(); + private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length(); + private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length(); + private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length(); + private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray(); + + /** + * Takes the provided AMQP messageId style object, and convert it to a base string. + * Encodes type information as a prefix where necessary to convey or escape the type + * of the provided object. + * + * @param messageId the raw messageId object to process + * @return the base string to be used in creating the actual id. + */ + public String toBaseMessageIdString(Object messageId) { + if (messageId == null) { + return null; + } + else if (messageId instanceof String) { + String stringId = (String) messageId; + + // If the given string has a type encoding prefix, + // we need to escape it as an encoded string (even if + // the existing encoding prefix was also for string) + if (hasTypeEncodingPrefix(stringId)) { + return AMQP_STRING_PREFIX + stringId; + } + else { + return stringId; + } + } + else if (messageId instanceof UUID) { + return AMQP_UUID_PREFIX + messageId.toString(); + } + else if (messageId instanceof UnsignedLong) { + return AMQP_ULONG_PREFIX + messageId.toString(); + } + else if (messageId instanceof Binary) { + ByteBuffer dup = ((Binary) messageId).asByteBuffer(); + + byte[] bytes = new byte[dup.remaining()]; + dup.get(bytes); + + String hex = convertBinaryToHexString(bytes); + + return AMQP_BINARY_PREFIX + hex; + } + else { + throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass()); + } + } + + /** + * Takes the provided base id string and return the appropriate amqp messageId style object. + * Converts the type based on any relevant encoding information found as a prefix. + * + * @param baseId the object to be converted to an AMQP MessageId value. + * @return the AMQP messageId style object + * @throws ActiveMQAMQPIllegalStateException if the provided baseId String indicates an encoded type but can't be converted to that type. + */ + public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException { + if (baseId == null) { + return null; + } + + try { + if (hasAmqpUuidPrefix(baseId)) { + String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH); + return UUID.fromString(uuidString); + } + else if (hasAmqpUlongPrefix(baseId)) { + String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH); + return UnsignedLong.valueOf(longString); + } + else if (hasAmqpStringPrefix(baseId)) { + return strip(baseId, AMQP_STRING_PREFIX_LENGTH); + } + else if (hasAmqpBinaryPrefix(baseId)) { + String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH); + byte[] bytes = convertHexStringToBinary(hexString); + return new Binary(bytes); + } + else { + // We have a string without any type prefix, transmit it as-is. + return baseId; + } + } + catch (IllegalArgumentException e) { + throw new ActiveMQAMQPIllegalStateException("Unable to convert ID value"); + } + } + + /** + * Convert the provided hex-string into a binary representation where each byte represents + * two characters of the hex string. + *

+ * The hex characters may be upper or lower case. + * + * @param hexString string to convert to a binary value. + * @return a byte array containing the binary representation + * @throws IllegalArgumentException if the provided String is a non-even length or contains + * non-hex characters + */ + public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException { + int length = hexString.length(); + + // As each byte needs two characters in the hex encoding, the string must be an even length. + if (length % 2 != 0) { + throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString); + } + + byte[] binary = new byte[length / 2]; + + for (int i = 0; i < length; i += 2) { + char highBitsChar = hexString.charAt(i); + char lowBitsChar = hexString.charAt(i + 1); + + int highBits = hexCharToInt(highBitsChar, hexString) << 4; + int lowBits = hexCharToInt(lowBitsChar, hexString); + + binary[i / 2] = (byte) (highBits + lowBits); + } + + return binary; + } + + /** + * Convert the provided binary into a hex-string representation where each character + * represents 4 bits of the provided binary, i.e each byte requires two characters. + *

+ * The returned hex characters are upper-case. + * + * @param bytes the binary value to convert to a hex String instance. + * @return a String containing a hex representation of the bytes + */ + public String convertBinaryToHexString(byte[] bytes) { + // Each byte is represented as 2 chars + StringBuilder builder = new StringBuilder(bytes.length * 2); + + for (byte b : bytes) { + // The byte will be expanded to int before shifting, replicating the + // sign bit, so mask everything beyond the first 4 bits afterwards + int highBitsInt = (b >> 4) & 0xF; + // We only want the first 4 bits + int lowBitsInt = b & 0xF; + + builder.append(HEX_CHARS[highBitsInt]); + builder.append(HEX_CHARS[lowBitsInt]); + } + + return builder.toString(); + } + + //----- Internal implementation ------------------------------------------// + + private boolean hasTypeEncodingPrefix(String stringId) { + return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) || + hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId); + } + + private boolean hasAmqpStringPrefix(String stringId) { + return stringId.startsWith(AMQP_STRING_PREFIX); + } + + private boolean hasAmqpUlongPrefix(String stringId) { + return stringId.startsWith(AMQP_ULONG_PREFIX); + } + + private boolean hasAmqpUuidPrefix(String stringId) { + return stringId.startsWith(AMQP_UUID_PREFIX); + } + + private boolean hasAmqpBinaryPrefix(String stringId) { + return stringId.startsWith(AMQP_BINARY_PREFIX); + } + + private String strip(String id, int numChars) { + return id.substring(numChars); + } + + private int hexCharToInt(char ch, String orig) throws IllegalArgumentException { + if (ch >= '0' && ch <= '9') { + // subtract '0' to get difference in position as an int + return ch - '0'; + } + else if (ch >= 'A' && ch <= 'F') { + // subtract 'A' to get difference in position as an int + // and then add 10 for the offset of 'A' + return ch - 'A' + 10; + } + else if (ch >= 'a' && ch <= 'f') { + // subtract 'a' to get difference in position as an int + // and then add 10 for the offset of 'a' + return ch - 'a' + 10; + } + + throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeInboundTransformer.java new file mode 100644 index 0000000..e194e6b --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeInboundTransformer.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter.message; + +import javax.jms.Message; + +public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer { + + public AMQPNativeInboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public String getTransformerName() { + return TRANSFORMER_NATIVE; + } + + @Override + public InboundTransformer getFallbackTransformer() { + return new AMQPRawInboundTransformer(getVendor()); + } + + @Override + public Message transform(EncodedMessage amqpMessage) throws Exception { + org.apache.qpid.proton.message.Message amqp = amqpMessage.decode(); + + Message rc = super.transform(amqpMessage); + + populateMessage(rc, amqp); + return rc; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeOutboundTransformer.java new file mode 100644 index 0000000..fa7f206 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPNativeOutboundTransformer.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter.message; + +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.message.ProtonJMessage; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; + +public class AMQPNativeOutboundTransformer extends OutboundTransformer { + + public AMQPNativeOutboundTransformer(JMSVendor vendor) { + super(vendor); + } + + public static ProtonJMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException { + byte[] data = new byte[(int) msg.getBodyLength()]; + msg.readBytes(data); + msg.reset(); + int count = msg.getIntProperty("JMSXDeliveryCount"); + + // decode... + ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(); + int offset = 0; + int len = data.length; + while (len > 0) { + final int decoded = amqp.decode(data, offset, len); + assert decoded > 0 : "Make progress decoding the message"; + offset += decoded; + len -= decoded; + } + + // Update the DeliveryCount header... + // The AMQP delivery-count field only includes prior failed delivery attempts, + // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1. + if (amqp.getHeader() == null) { + amqp.setHeader(new Header()); + } + + amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1)); + + return amqp; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPRawInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPRawInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPRawInboundTransformer.java new file mode 100644 index 0000000..fd9540d --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/AMQPRawInboundTransformer.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter.message; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Message; + +public class AMQPRawInboundTransformer extends InboundTransformer { + + public AMQPRawInboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public String getTransformerName() { + return TRANSFORMER_RAW; + } + + @Override + public InboundTransformer getFallbackTransformer() { + return null; // No fallback from full raw transform + } + + @Override + public Message transform(EncodedMessage amqpMessage) throws Exception { + BytesMessage rc = vendor.createBytesMessage(); + rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength()); + + // We cannot decode the message headers to check so err on the side of caution + // and mark all messages as persistent. + rc.setJMSDeliveryMode(DeliveryMode.PERSISTENT); + rc.setJMSPriority(defaultPriority); + + final long now = System.currentTimeMillis(); + rc.setJMSTimestamp(now); + if (defaultTtl > 0) { + rc.setJMSExpiration(now + defaultTtl); + } + + rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); + rc.setBooleanProperty(prefixVendor + "NATIVE", true); + + return rc; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/EncodedMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/EncodedMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/EncodedMessage.java new file mode 100644 index 0000000..2857c17 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/EncodedMessage.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter.message; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.message.Message; + +public class EncodedMessage { + + private final Binary data; + final long messageFormat; + + public EncodedMessage(long messageFormat, byte[] data, int offset, int length) { + this.data = new Binary(data, offset, length); + this.messageFormat = messageFormat; + } + + public long getMessageFormat() { + return messageFormat; + } + + public Message decode() throws Exception { + Message amqp = Message.Factory.create(); + + int offset = getArrayOffset(); + int len = getLength(); + while (len > 0) { + final int decoded = amqp.decode(getArray(), offset, len); + assert decoded > 0 : "Make progress decoding the message"; + offset += decoded; + len -= decoded; + } + + return amqp; + } + + public int getLength() { + return data.getLength(); + } + + public int getArrayOffset() { + return data.getArrayOffset(); + } + + public byte[] getArray() { + return data.getArray(); + } + + @Override + public String toString() { + return data.toString(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/InboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/InboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/InboundTransformer.java new file mode 100644 index 0000000..3a58038 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/InboundTransformer.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter.message; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Decimal128; +import org.apache.qpid.proton.amqp.Decimal32; +import org.apache.qpid.proton.amqp.Decimal64; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedByte; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.UnsignedLong; +import org.apache.qpid.proton.amqp.UnsignedShort; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Footer; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import java.util.Map; +import java.util.Set; + +import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; + +public abstract class InboundTransformer { + + JMSVendor vendor; + + public static final String TRANSFORMER_NATIVE = "native"; + public static final String TRANSFORMER_RAW = "raw"; + public static final String TRANSFORMER_JMS = "jms"; + + String prefixVendor = "JMS_AMQP_"; + String prefixDeliveryAnnotations = "DA_"; + String prefixMessageAnnotations = "MA_"; + String prefixFooter = "FT_"; + + int defaultDeliveryMode = DeliveryMode.NON_PERSISTENT; + int defaultPriority = Message.DEFAULT_PRIORITY; + long defaultTtl = Message.DEFAULT_TIME_TO_LIVE; + + public InboundTransformer(JMSVendor vendor) { + this.vendor = vendor; + } + + public abstract Message transform(EncodedMessage amqpMessage) throws Exception; + + public abstract String getTransformerName(); + + public abstract InboundTransformer getFallbackTransformer(); + + public int getDefaultDeliveryMode() { + return defaultDeliveryMode; + } + + public void setDefaultDeliveryMode(int defaultDeliveryMode) { + this.defaultDeliveryMode = defaultDeliveryMode; + } + + public int getDefaultPriority() { + return defaultPriority; + } + + public void setDefaultPriority(int defaultPriority) { + this.defaultPriority = defaultPriority; + } + + public long getDefaultTtl() { + return defaultTtl; + } + + public void setDefaultTtl(long defaultTtl) { + this.defaultTtl = defaultTtl; + } + + public String getPrefixVendor() { + return prefixVendor; + } + + public void setPrefixVendor(String prefixVendor) { + this.prefixVendor = prefixVendor; + } + + public JMSVendor getVendor() { + return vendor; + } + + public void setVendor(JMSVendor vendor) { + this.vendor = vendor; + } + + @SuppressWarnings("unchecked") + protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception { + Header header = amqp.getHeader(); + if (header == null) { + header = new Header(); + } + + if (header.getDurable() != null) { + jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + } + else { + jms.setJMSDeliveryMode(defaultDeliveryMode); + } + + if (header.getPriority() != null) { + jms.setJMSPriority(header.getPriority().intValue()); + } + else { + jms.setJMSPriority(defaultPriority); + } + + if (header.getFirstAcquirer() != null) { + jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer()); + } + + if (header.getDeliveryCount() != null) { + vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue()); + } + + final MessageAnnotations ma = amqp.getMessageAnnotations(); + if (ma != null) { + for (Map.Entry entry : ma.getValue().entrySet()) { + String key = entry.getKey().toString(); + if ("x-opt-jms-type".equals(key) && entry.getValue() != null) { + // Legacy annotation, JMSType value will be replaced by Subject further down if also present. + jms.setJMSType(entry.getValue().toString()); + } + else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { + long deliveryTime = ((Number) entry.getValue()).longValue(); + jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime); + } + else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { + long delay = ((Number) entry.getValue()).longValue(); + if (delay > 0) { + jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay); + } + } + //todo + /*else if ("x-opt-delivery-repeat".equals(key) && entry.getValue() != null) { + int repeat = ((Number) entry.getValue()).intValue(); + if (repeat > 0) { + jms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); + } + } else if ("x-opt-delivery-period".equals(key) && entry.getValue() != null) { + long period = ((Number) entry.getValue()).longValue(); + if (period > 0) { + jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); + } + } else if ("x-opt-delivery-cron".equals(key) && entry.getValue() != null) { + String cronEntry = (String) entry.getValue(); + if (cronEntry != null) { + jms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cronEntry); + } + }*/ + + setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); + } + } + + final ApplicationProperties ap = amqp.getApplicationProperties(); + if (ap != null) { + for (Map.Entry entry : (Set>) ap.getValue().entrySet()) { + String key = entry.getKey().toString(); + if ("JMSXGroupID".equals(key)) { + vendor.setJMSXGroupID(jms, entry.getValue().toString()); + } + else if ("JMSXGroupSequence".equals(key)) { + vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue()); + } + else if ("JMSXUserID".equals(key)) { + vendor.setJMSXUserID(jms, entry.getValue().toString()); + } + else { + setProperty(jms, key, entry.getValue()); + } + } + } + + final Properties properties = amqp.getProperties(); + if (properties != null) { + if (properties.getMessageId() != null) { + jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId())); + } + Binary userId = properties.getUserId(); + if (userId != null) { + vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8")); + } + if (properties.getTo() != null) { + jms.setJMSDestination(vendor.createDestination(properties.getTo())); + } + if (properties.getSubject() != null) { + jms.setJMSType(properties.getSubject()); + } + if (properties.getReplyTo() != null) { + jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo())); + } + if (properties.getCorrelationId() != null) { + jms.setJMSCorrelationID(properties.getCorrelationId().toString()); + } + if (properties.getContentType() != null) { + jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString()); + } + if (properties.getContentEncoding() != null) { + jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString()); + } + if (properties.getCreationTime() != null) { + jms.setJMSTimestamp(properties.getCreationTime().getTime()); + } + if (properties.getGroupId() != null) { + vendor.setJMSXGroupID(jms, properties.getGroupId()); + } + if (properties.getGroupSequence() != null) { + vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue()); + } + if (properties.getReplyToGroupId() != null) { + jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId()); + } + if (properties.getAbsoluteExpiryTime() != null) { + jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime()); + } + } + + // If the jms expiration has not yet been set... + if (jms.getJMSExpiration() == 0) { + // Then lets try to set it based on the message ttl. + long ttl = defaultTtl; + if (header.getTtl() != null) { + ttl = header.getTtl().longValue(); + } + + if (ttl == 0) { + jms.setJMSExpiration(0); + } + else { + jms.setJMSExpiration(System.currentTimeMillis() + ttl); + } + } + + final Footer fp = amqp.getFooter(); + if (fp != null) { + for (Map.Entry entry : (Set>) fp.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue()); + } + } + } + + private void setProperty(Message msg, String key, Object value) throws JMSException { + if (value instanceof UnsignedLong) { + long v = ((UnsignedLong) value).longValue(); + msg.setLongProperty(key, v); + } + else if (value instanceof UnsignedInteger) { + long v = ((UnsignedInteger) value).longValue(); + if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) { + msg.setIntProperty(key, (int) v); + } + else { + msg.setLongProperty(key, v); + } + } + else if (value instanceof UnsignedShort) { + int v = ((UnsignedShort) value).intValue(); + if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) { + msg.setShortProperty(key, (short) v); + } + else { + msg.setIntProperty(key, v); + } + } + else if (value instanceof UnsignedByte) { + short v = ((UnsignedByte) value).shortValue(); + if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) { + msg.setByteProperty(key, (byte) v); + } + else { + msg.setShortProperty(key, v); + } + } + else if (value instanceof Symbol) { + msg.setStringProperty(key, value.toString()); + } + else if (value instanceof Decimal128) { + msg.setDoubleProperty(key, ((Decimal128) value).doubleValue()); + } + else if (value instanceof Decimal64) { + msg.setDoubleProperty(key, ((Decimal64) value).doubleValue()); + } + else if (value instanceof Decimal32) { + msg.setFloatProperty(key, ((Decimal32) value).floatValue()); + } + else if (value instanceof Binary) { + msg.setStringProperty(key, value.toString()); + } + else { + msg.setObjectProperty(key, value); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingInboundTransformer.java new file mode 100644 index 0000000..e804818 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingInboundTransformer.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter.message; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Section; + +import javax.jms.BytesMessage; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class JMSMappingInboundTransformer extends InboundTransformer { + + public JMSMappingInboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public String getTransformerName() { + return TRANSFORMER_JMS; + } + + @Override + public InboundTransformer getFallbackTransformer() { + return new AMQPNativeInboundTransformer(getVendor()); + } + + @SuppressWarnings({"unchecked"}) + @Override + public Message transform(EncodedMessage amqpMessage) throws Exception { + org.apache.qpid.proton.message.Message amqp = amqpMessage.decode(); + + Message rc; + final Section body = amqp.getBody(); + if (body == null) { + rc = vendor.createMessage(); + } + else if (body instanceof Data) { + Binary d = ((Data) body).getValue(); + BytesMessage m = vendor.createBytesMessage(); + m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength()); + rc = m; + } + else if (body instanceof AmqpSequence) { + AmqpSequence sequence = (AmqpSequence) body; + StreamMessage m = vendor.createStreamMessage(); + for (Object item : sequence.getValue()) { + m.writeObject(item); + } + rc = m; + } + else if (body instanceof AmqpValue) { + Object value = ((AmqpValue) body).getValue(); + if (value == null) { + rc = vendor.createObjectMessage(); + } + if (value instanceof String) { + TextMessage m = vendor.createTextMessage(); + m.setText((String) value); + rc = m; + } + else if (value instanceof Binary) { + Binary d = (Binary) value; + BytesMessage m = vendor.createBytesMessage(); + m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength()); + rc = m; + } + else if (value instanceof List) { + StreamMessage m = vendor.createStreamMessage(); + for (Object item : (List) value) { + m.writeObject(item); + } + rc = m; + } + else if (value instanceof Map) { + MapMessage m = vendor.createMapMessage(); + final Set> set = ((Map) value).entrySet(); + for (Map.Entry entry : set) { + m.setObject(entry.getKey(), entry.getValue()); + } + rc = m; + } + else { + ObjectMessage m = vendor.createObjectMessage(); + m.setObject((Serializable) value); + rc = m; + } + } + else { + throw new RuntimeException("Unexpected body type: " + body.getClass()); + } + rc.setJMSDeliveryMode(defaultDeliveryMode); + rc.setJMSPriority(defaultPriority); + rc.setJMSExpiration(defaultTtl); + + populateMessage(rc, amqp); + + rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); + rc.setBooleanProperty(prefixVendor + "NATIVE", false); + return rc; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingOutboundTransformer.java new file mode 100644 index 0000000..7babcf3 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSMappingOutboundTransformer.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter.message; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedByte; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Footer; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.proton.plug.exceptions.ActiveMQAMQPIllegalStateException; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageEOFException; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Date; +import java.util.Enumeration; +import java.util.HashMap; + +public class JMSMappingOutboundTransformer extends OutboundTransformer { + + public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest"); + public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to"); + + public static final byte QUEUE_TYPE = 0x00; + public static final byte TOPIC_TYPE = 0x01; + public static final byte TEMP_QUEUE_TYPE = 0x02; + public static final byte TEMP_TOPIC_TYPE = 0x03; + + // Deprecated legacy values used by old QPid AMQP 1.0 JMS client. + + public static final Symbol LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-to-type"); + public static final Symbol LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-reply-type"); + + public static final String LEGACY_QUEUE_TYPE = "queue"; + public static final String LEGACY_TOPIC_TYPE = "topic"; + public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue"; + public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic"; + + public JMSMappingOutboundTransformer(JMSVendor vendor) { + super(vendor); + } + + /** + * Perform the conversion between JMS Message and Proton Message without + * re-encoding it to array. This is needed because some frameworks may elect + * to do this on their own way (Netty for instance using Nettybuffers) + * + * @param msg + * @return + * @throws Exception + */ + public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException { + Header header = new Header(); + Properties props = new Properties(); + HashMap daMap = null; + HashMap maMap = null; + HashMap apMap = null; + Section body = null; + HashMap footerMap = null; + if (msg instanceof BytesMessage) { + BytesMessage m = (BytesMessage) msg; + byte[] data = new byte[(int) m.getBodyLength()]; + m.readBytes(data); + m.reset(); // Need to reset after readBytes or future readBytes + // calls (ex: redeliveries) will fail and return -1 + body = new Data(new Binary(data)); + } + if (msg instanceof TextMessage) { + body = new AmqpValue(((TextMessage) msg).getText()); + } + if (msg instanceof MapMessage) { + final HashMap map = new HashMap(); + final MapMessage m = (MapMessage) msg; + final Enumeration names = m.getMapNames(); + while (names.hasMoreElements()) { + String key = names.nextElement(); + map.put(key, m.getObject(key)); + } + body = new AmqpValue(map); + } + if (msg instanceof StreamMessage) { + ArrayList list = new ArrayList(); + final StreamMessage m = (StreamMessage) msg; + try { + while (true) { + list.add(m.readObject()); + } + } + catch (MessageEOFException e) { + } + body = new AmqpSequence(list); + } + if (msg instanceof ObjectMessage) { + body = new AmqpValue(((ObjectMessage) msg).getObject()); + } + + header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false); + header.setPriority(new UnsignedByte((byte) msg.getJMSPriority())); + if (msg.getJMSType() != null) { + props.setSubject(msg.getJMSType()); + } + if (msg.getJMSMessageID() != null) { + + String msgId = msg.getJMSMessageID(); + + if (msgId != null) { + try { + props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId)); + } + catch (ActiveMQAMQPIllegalStateException e) { + props.setMessageId(msgId); + } + } + else { + props.setMessageId(msgId.toString()); + } + } + if (msg.getJMSDestination() != null) { + props.setTo(vendor.toAddress(msg.getJMSDestination())); + if (maMap == null) { + maMap = new HashMap(); + } + maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination())); + + // Deprecated: used by legacy QPid AMQP 1.0 JMS client + maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSDestination())); + } + if (msg.getJMSReplyTo() != null) { + props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo())); + if (maMap == null) { + maMap = new HashMap(); + } + maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo())); + + // Deprecated: used by legacy QPid AMQP 1.0 JMS client + maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo())); + } + if (msg.getJMSCorrelationID() != null) { + props.setCorrelationId(msg.getJMSCorrelationID()); + } + if (msg.getJMSExpiration() != 0) { + long ttl = msg.getJMSExpiration() - System.currentTimeMillis(); + if (ttl < 0) { + ttl = 1; + } + header.setTtl(new UnsignedInteger((int) ttl)); + + props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration())); + } + if (msg.getJMSTimestamp() != 0) { + props.setCreationTime(new Date(msg.getJMSTimestamp())); + } + + final Enumeration keys = msg.getPropertyNames(); + while (keys.hasMoreElements()) { + String key = keys.nextElement(); + if (key.equals(messageFormatKey) || key.equals(nativeKey)) { + // skip.. + } + else if (key.equals(firstAcquirerKey)) { + header.setFirstAcquirer(msg.getBooleanProperty(key)); + } + else if (key.startsWith("JMSXDeliveryCount")) { + // The AMQP delivery-count field only includes prior failed delivery attempts, + // whereas JMSXDeliveryCount includes the first/current delivery attempt. + int amqpDeliveryCount = msg.getIntProperty(key) - 1; + if (amqpDeliveryCount > 0) { + header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); + } + } + else if (key.startsWith("JMSXUserID")) { + String value = msg.getStringProperty(key); + props.setUserId(new Binary(value.getBytes("UTF-8"))); + } + else if (key.startsWith("JMSXGroupID")) { + String value = msg.getStringProperty(key); + props.setGroupId(value); + if (apMap == null) { + apMap = new HashMap(); + } + apMap.put(key, value); + } + else if (key.startsWith("JMSXGroupSeq")) { + UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key)); + props.setGroupSequence(value); + if (apMap == null) { + apMap = new HashMap(); + } + apMap.put(key, value); + } + else if (key.startsWith(prefixDeliveryAnnotationsKey)) { + if (daMap == null) { + daMap = new HashMap(); + } + String name = key.substring(prefixDeliveryAnnotationsKey.length()); + daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); + } + else if (key.startsWith(prefixMessageAnnotationsKey)) { + if (maMap == null) { + maMap = new HashMap(); + } + String name = key.substring(prefixMessageAnnotationsKey.length()); + maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); + } + else if (key.equals(contentTypeKey)) { + props.setContentType(Symbol.getSymbol(msg.getStringProperty(key))); + } + else if (key.equals(contentEncodingKey)) { + props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key))); + } + else if (key.equals(replyToGroupIDKey)) { + props.setReplyToGroupId(msg.getStringProperty(key)); + } + else if (key.startsWith(prefixFooterKey)) { + if (footerMap == null) { + footerMap = new HashMap(); + } + String name = key.substring(prefixFooterKey.length()); + footerMap.put(name, msg.getObjectProperty(key)); + } + else { + if (apMap == null) { + apMap = new HashMap(); + } + apMap.put(key, msg.getObjectProperty(key)); + } + } + + MessageAnnotations ma = null; + if (maMap != null) { + ma = new MessageAnnotations(maMap); + } + DeliveryAnnotations da = null; + if (daMap != null) { + da = new DeliveryAnnotations(daMap); + } + ApplicationProperties ap = null; + if (apMap != null) { + ap = new ApplicationProperties(apMap); + } + Footer footer = null; + if (footerMap != null) { + footer = new Footer(footerMap); + } + + return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer); + } + + private static byte destinationType(Destination destination) { + if (destination instanceof Queue) { + if (destination instanceof TemporaryQueue) { + return TEMP_QUEUE_TYPE; + } + else { + return QUEUE_TYPE; + } + } + else if (destination instanceof Topic) { + if (destination instanceof TemporaryTopic) { + return TEMP_TOPIC_TYPE; + } + else { + return TOPIC_TYPE; + } + } + + throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); + } + + // Used by legacy QPid AMQP 1.0 JMS client. + @Deprecated + private static String destinationAttributes(Destination destination) { + if (destination instanceof Queue) { + if (destination instanceof TemporaryQueue) { + return LEGACY_TEMP_QUEUE_TYPE; + } + else { + return LEGACY_QUEUE_TYPE; + } + } + else if (destination instanceof Topic) { + if (destination instanceof TemporaryTopic) { + return LEGACY_TEMP_TOPIC_TYPE; + } + else { + return LEGACY_TOPIC_TYPE; + } + } + + throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSVendor.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSVendor.java new file mode 100644 index 0000000..7255295 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/JMSVendor.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter.message; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +public interface JMSVendor { + + BytesMessage createBytesMessage(); + + StreamMessage createStreamMessage(); + + Message createMessage(); + + TextMessage createTextMessage(); + + ObjectMessage createObjectMessage(); + + MapMessage createMapMessage(); + + void setJMSXUserID(Message message, String value); + + Destination createDestination(String name); + + void setJMSXGroupID(Message message, String groupId); + + void setJMSXGroupSequence(Message message, int value); + + void setJMSXDeliveryCount(Message message, long value); + + String toAddress(Destination destination); + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/OutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/OutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/OutboundTransformer.java new file mode 100644 index 0000000..bd20eee --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/message/OutboundTransformer.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter.message; + +public abstract class OutboundTransformer { + + JMSVendor vendor; + String prefixVendor; + + String prefixDeliveryAnnotations = "DA_"; + String prefixMessageAnnotations = "MA_"; + String prefixFooter = "FT_"; + + String messageFormatKey; + String nativeKey; + String firstAcquirerKey; + String prefixDeliveryAnnotationsKey; + String prefixMessageAnnotationsKey; + String contentTypeKey; + String contentEncodingKey; + String replyToGroupIDKey; + String prefixFooterKey; + + public OutboundTransformer(JMSVendor vendor) { + this.vendor = vendor; + this.setPrefixVendor("JMS_AMQP_"); + } + + public String getPrefixVendor() { + return prefixVendor; + } + + public void setPrefixVendor(String prefixVendor) { + this.prefixVendor = prefixVendor; + + messageFormatKey = prefixVendor + "MESSAGE_FORMAT"; + nativeKey = prefixVendor + "NATIVE"; + firstAcquirerKey = prefixVendor + "FirstAcquirer"; + prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations; + prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations; + contentTypeKey = prefixVendor + "ContentType"; + contentEncodingKey = prefixVendor + "ContentEncoding"; + replyToGroupIDKey = prefixVendor + "ReplyToGroupID"; + prefixFooterKey = prefixVendor + prefixFooter; + + } + + public JMSVendor getVendor() { + return vendor; + } + + public void setVendor(JMSVendor vendor) { + this.vendor = vendor; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 6a93df0..421a382 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -20,11 +20,11 @@ import java.util.concurrent.Executor; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; -import org.apache.activemq.transport.amqp.message.EncodedMessage; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java index fc9fe2c..28f8b86 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java @@ -26,7 +26,7 @@ import java.util.Map; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.transport.amqp.message.EncodedMessage; +import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; import org.apache.qpid.proton.amqp.messaging.AmqpValue; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c161ab46/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 47b58e4..ed33038 100644 --- a/pom.xml +++ b/pom.xml @@ -414,18 +414,6 @@ - org.apache.activemq - activemq-amqp - ${activemq5-version} - - - org.apache.geronimo.specs - geronimo-jms_1.1_spec - - - - - org.slf4j slf4j-api ${slf4j.version}