Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B92AC200C3A for ; Fri, 3 Mar 2017 01:55:25 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B7C68160B85; Fri, 3 Mar 2017 00:55:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A3FBC160B7D for ; Fri, 3 Mar 2017 01:55:23 +0100 (CET) Received: (qmail 49913 invoked by uid 500); 3 Mar 2017 00:55:22 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 49794 invoked by uid 99); 3 Mar 2017 00:55:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Mar 2017 00:55:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AC810DFF5D; Fri, 3 Mar 2017 00:55:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Fri, 03 Mar 2017 00:55:24 -0000 Message-Id: <0dec7a8b1d25457dbee87048e5006bb0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/5] activemq-artemis git commit: Fixing converters part I archived-at: Fri, 03 Mar 2017 00:55:25 -0000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java deleted file mode 100644 index 1e83959..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; - -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.Set; - -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; - -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.Decimal128; -import org.apache.qpid.proton.amqp.Decimal32; -import org.apache.qpid.proton.amqp.Decimal64; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.UnsignedByte; -import org.apache.qpid.proton.amqp.UnsignedInteger; -import org.apache.qpid.proton.amqp.UnsignedLong; -import org.apache.qpid.proton.amqp.UnsignedShort; -import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; -import org.apache.qpid.proton.amqp.messaging.Footer; -import org.apache.qpid.proton.amqp.messaging.Header; -import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; -import org.apache.qpid.proton.amqp.messaging.Properties; - -public abstract class InboundTransformer { - - protected IDGenerator idGenerator; - - public static final String TRANSFORMER_JMS = "jms"; - - public InboundTransformer(IDGenerator idGenerator) { - this.idGenerator = idGenerator; - } - - public abstract ServerJMSMessage transform(AMQPMessage amqpMessage) throws Exception; - - public abstract String getTransformerName(); - - @SuppressWarnings("unchecked") - protected ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception { - Header header = amqp.getHeader(); - if (header != null) { - jms.setBooleanProperty(JMS_AMQP_HEADER, true); - - if (header.getDurable() != null) { - jms.setBooleanProperty(JMS_AMQP_HEADER_DURABLE, true); - jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - } else { - jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); - } - - if (header.getPriority() != null) { - jms.setBooleanProperty(JMS_AMQP_HEADER_PRIORITY, true); - jms.setJMSPriority(header.getPriority().intValue()); - } else { - jms.setJMSPriority(Message.DEFAULT_PRIORITY); - } - - if (header.getFirstAcquirer() != null) { - jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer()); - } - - if (header.getDeliveryCount() != null) { - // AMQP Delivery Count counts only failed delivers where JMS - // Delivery Count should include the original delivery in the count. - jms.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue() + 1); - } - } else { - jms.setJMSPriority((byte) Message.DEFAULT_PRIORITY); - jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); - } - - final MessageAnnotations ma = amqp.getMessageAnnotations(); - if (ma != null) { - for (Map.Entry entry : ma.getValue().entrySet()) { - String key = entry.getKey().toString(); - if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { - long deliveryTime = ((Number) entry.getValue()).longValue(); - jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime); - } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { - long delay = ((Number) entry.getValue()).longValue(); - if (delay > 0) { - jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay); - } - } - - setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue()); - } - } - - final ApplicationProperties ap = amqp.getApplicationProperties(); - if (ap != null) { - for (Map.Entry entry : (Set>) ap.getValue().entrySet()) { - setProperty(jms, entry.getKey().toString(), entry.getValue()); - } - } - - final Properties properties = amqp.getProperties(); - if (properties != null) { - if (properties.getMessageId() != null) { - jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId())); - } - Binary userId = properties.getUserId(); - if (userId != null) { - // TODO - Better Way to set this? - jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8)); - } - if (properties.getTo() != null) { - jms.setJMSDestination(new ServerDestination(properties.getTo())); - } - if (properties.getSubject() != null) { - jms.setJMSType(properties.getSubject()); - } - if (properties.getReplyTo() != null) { - jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo())); - } - if (properties.getCorrelationId() != null) { - jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId())); - } - if (properties.getContentType() != null) { - jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString()); - } - if (properties.getContentEncoding() != null) { - jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString()); - } - if (properties.getCreationTime() != null) { - jms.setJMSTimestamp(properties.getCreationTime().getTime()); - } - if (properties.getGroupId() != null) { - jms.setStringProperty("_AMQ_GROUP_ID", properties.getGroupId()); - } - if (properties.getGroupSequence() != null) { - jms.setIntProperty("JMSXGroupSeq", properties.getGroupSequence().intValue()); - } - if (properties.getReplyToGroupId() != null) { - jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId()); - } - if (properties.getAbsoluteExpiryTime() != null) { - jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime()); - } - } - - // If the jms expiration has not yet been set... - if (header != null && jms.getJMSExpiration() == 0) { - // Then lets try to set it based on the message ttl. - long ttl = Message.DEFAULT_TIME_TO_LIVE; - if (header.getTtl() != null) { - ttl = header.getTtl().longValue(); - } - - if (ttl == 0) { - jms.setJMSExpiration(0); - } else { - jms.setJMSExpiration(System.currentTimeMillis() + ttl); - } - } - - final Footer fp = amqp.getFooter(); - if (fp != null) { - for (Map.Entry entry : (Set>) fp.getValue().entrySet()) { - String key = entry.getKey().toString(); - setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue()); - } - } - - return jms; - } - - private void setProperty(Message msg, String key, Object value) throws JMSException { - if (value instanceof UnsignedLong) { - long v = ((UnsignedLong) value).longValue(); - msg.setLongProperty(key, v); - } else if (value instanceof UnsignedInteger) { - long v = ((UnsignedInteger) value).longValue(); - if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) { - msg.setIntProperty(key, (int) v); - } else { - msg.setLongProperty(key, v); - } - } else if (value instanceof UnsignedShort) { - int v = ((UnsignedShort) value).intValue(); - if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) { - msg.setShortProperty(key, (short) v); - } else { - msg.setIntProperty(key, v); - } - } else if (value instanceof UnsignedByte) { - short v = ((UnsignedByte) value).shortValue(); - if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) { - msg.setByteProperty(key, (byte) v); - } else { - msg.setShortProperty(key, v); - } - } else if (value instanceof Symbol) { - msg.setStringProperty(key, value.toString()); - } else if (value instanceof Decimal128) { - msg.setDoubleProperty(key, ((Decimal128) value).doubleValue()); - } else if (value instanceof Decimal64) { - msg.setDoubleProperty(key, ((Decimal64) value).doubleValue()); - } else if (value instanceof Decimal32) { - msg.setFloatProperty(key, ((Decimal32) value).floatValue()); - } else if (value instanceof Binary) { - msg.setStringProperty(key, value.toString()); - } else { - msg.setObjectProperty(key, value); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java deleted file mode 100644 index 4c7426e..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_MAP; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_NULL; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMapMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createObjectMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createStreamMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createTextMessage; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.getCharsetForTextualContent; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.isContentType; - -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; - -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; -import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.messaging.AmqpSequence; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.amqp.messaging.Data; -import org.apache.qpid.proton.amqp.messaging.Section; -import org.apache.qpid.proton.message.Message; - -public class JMSMappingInboundTransformer extends InboundTransformer { - - public JMSMappingInboundTransformer(IDGenerator idGenerator) { - super(idGenerator); - } - - @Override - public String getTransformerName() { - return TRANSFORMER_JMS; - } - - - public ServerJMSMessage transform(EncodedMessage message) throws Exception { - AMQPMessage messageEncode = new AMQPMessage(message.getMessageFormat(), message.getArray(), null); - return transform(messageEncode); - } - - @Override - public ServerJMSMessage transform(AMQPMessage amqpMessage) throws Exception { - ServerJMSMessage transformedMessage = createServerMessage(amqpMessage.getProtonMessage()); - populateMessage(transformedMessage, amqpMessage.getProtonMessage()); - - // Regardless of the transformer that finally decoded the message we need to ensure that - // the AMQP Message Format value is preserved for application on retransmit. - if (transformedMessage != null && amqpMessage.getMessageFormat() != 0) { - transformedMessage.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat()); - } - - return transformedMessage; - } - - @SuppressWarnings("unchecked") - private ServerJMSMessage createServerMessage(Message message) throws Exception { - - Section body = message.getBody(); - ServerJMSMessage result; - - if (body == null) { - if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) { - result = createObjectMessage(idGenerator); - } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) { - result = createBytesMessage(idGenerator); - } else { - Charset charset = getCharsetForTextualContent(message.getContentType()); - if (charset != null) { - result = createTextMessage(idGenerator); - } else { - result = createMessage(idGenerator); - } - } - - result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL); - } else if (body instanceof Data) { - Binary payload = ((Data) body).getValue(); - - if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) { - result = createObjectMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); - } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message)) { - result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); - } else { - Charset charset = getCharsetForTextualContent(message.getContentType()); - if (StandardCharsets.UTF_8.equals(charset)) { - ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength()); - - try { - CharBuffer chars = charset.newDecoder().decode(buf); - result = createTextMessage(idGenerator, String.valueOf(chars)); - } catch (CharacterCodingException e) { - result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); - } - } else { - result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); - } - } - - result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); - } else if (body instanceof AmqpSequence) { - AmqpSequence sequence = (AmqpSequence) body; - ServerJMSStreamMessage m = createStreamMessage(idGenerator); - for (Object item : sequence.getValue()) { - m.writeObject(item); - } - - result = m; - result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE); - } else if (body instanceof AmqpValue) { - Object value = ((AmqpValue) body).getValue(); - if (value == null || value instanceof String) { - result = createTextMessage(idGenerator, (String) value); - - result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING); - } else if (value instanceof Binary) { - Binary payload = (Binary) value; - - if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) { - result = createObjectMessage(idGenerator, payload); - } else { - result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength()); - } - - result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); - } else if (value instanceof List) { - ServerJMSStreamMessage m = createStreamMessage(idGenerator); - for (Object item : (List) value) { - m.writeObject(item); - } - result = m; - result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST); - } else if (value instanceof Map) { - result = createMapMessage(idGenerator, (Map) value); - result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP); - } else { - // Trigger fall-back to native encoder which generates BytesMessage with the - // original message stored in the message body. - throw new ActiveMQAMQPInternalErrorException("Unable to encode to ActiveMQ JMS Message"); - } - } else { - throw new RuntimeException("Unexpected body type: " + body.getClass()); - } - - return result; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java deleted file mode 100644 index 23bcaf1..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java +++ /dev/null @@ -1,574 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID; -import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.EMPTY_BINARY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PROPERTIES; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.toAddress; - -import java.io.UnsupportedEncodingException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Date; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageEOFException; -import javax.jms.Queue; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; - -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; -import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; -import org.apache.activemq.artemis.reader.MessageUtil; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.UnsignedByte; -import org.apache.qpid.proton.amqp.UnsignedInteger; -import org.apache.qpid.proton.amqp.messaging.AmqpSequence; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; -import org.apache.qpid.proton.amqp.messaging.Data; -import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; -import org.apache.qpid.proton.amqp.messaging.Footer; -import org.apache.qpid.proton.amqp.messaging.Header; -import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; -import org.apache.qpid.proton.amqp.messaging.Properties; -import org.apache.qpid.proton.amqp.messaging.Section; -import org.apache.qpid.proton.codec.EncoderImpl; -import org.apache.qpid.proton.codec.WritableBuffer; -import org.jboss.logging.Logger; - -public class JMSMappingOutboundTransformer extends OutboundTransformer { - - private static final Logger logger = Logger.getLogger(JMSMappingOutboundTransformer.class); - - public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest"); - public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to"); - - public static final byte QUEUE_TYPE = 0x00; - public static final byte TOPIC_TYPE = 0x01; - public static final byte TEMP_QUEUE_TYPE = 0x02; - public static final byte TEMP_TOPIC_TYPE = 0x03; - - public JMSMappingOutboundTransformer(IDGenerator idGenerator) { - super(idGenerator); - } - - @Override - public long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException { - if (message == null) { - return 0; - } - - long messageFormat = 0; - Header header = null; - Properties properties = null; - Map daMap = null; - Map maMap = null; - Map apMap = null; - Map footerMap = null; - - Section body = convertBody(message); - - if (message.getInnerMessage().isDurable()) { - if (header == null) { - header = new Header(); - } - header.setDurable(true); - } - byte priority = (byte) message.getJMSPriority(); - if (priority != Message.DEFAULT_PRIORITY) { - if (header == null) { - header = new Header(); - } - header.setPriority(UnsignedByte.valueOf(priority)); - } - String type = message.getJMSType(); - if (type != null) { - if (properties == null) { - properties = new Properties(); - } - properties.setSubject(type); - } - String messageId = message.getJMSMessageID(); - if (messageId != null) { - if (properties == null) { - properties = new Properties(); - } - try { - properties.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(messageId)); - } catch (ActiveMQAMQPIllegalStateException e) { - properties.setMessageId(messageId); - } - } - Destination destination = message.getJMSDestination(); - if (destination != null) { - if (properties == null) { - properties = new Properties(); - } - properties.setTo(toAddress(destination)); - if (maMap == null) { - maMap = new HashMap<>(); - } - maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination)); - } - Destination replyTo = message.getJMSReplyTo(); - if (replyTo != null) { - if (properties == null) { - properties = new Properties(); - } - properties.setReplyTo(toAddress(replyTo)); - if (maMap == null) { - maMap = new HashMap<>(); - } - maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo)); - } - String correlationId = message.getJMSCorrelationID(); - if (correlationId != null) { - if (properties == null) { - properties = new Properties(); - } - try { - properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId)); - } catch (ActiveMQAMQPIllegalStateException e) { - properties.setCorrelationId(correlationId); - } - } - long expiration = message.getJMSExpiration(); - if (expiration != 0) { - long ttl = expiration - System.currentTimeMillis(); - if (ttl < 0) { - ttl = 1; - } - - if (header == null) { - header = new Header(); - } - header.setTtl(new UnsignedInteger((int) ttl)); - - if (properties == null) { - properties = new Properties(); - } - properties.setAbsoluteExpiryTime(new Date(expiration)); - } - long timeStamp = message.getJMSTimestamp(); - if (timeStamp != 0) { - if (properties == null) { - properties = new Properties(); - } - properties.setCreationTime(new Date(timeStamp)); - } - - final Set keySet = MessageUtil.getPropertyNames(message.getInnerMessage()); - for (String key : keySet) { - if (key.startsWith("JMSX")) { - if (key.equals("JMSXDeliveryCount")) { - // The AMQP delivery-count field only includes prior failed delivery attempts, - // whereas JMSXDeliveryCount includes the first/current delivery attempt. - int amqpDeliveryCount = message.getDeliveryCount() - 1; - if (amqpDeliveryCount > 0) { - if (header == null) { - header = new Header(); - } - header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); - } - continue; - } else if (key.equals("JMSXUserID")) { - String value = message.getStringProperty(key); - if (properties == null) { - properties = new Properties(); - } - properties.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8))); - continue; - } else if (key.equals("JMSXGroupID")) { - String value = message.getStringProperty(key); - if (properties == null) { - properties = new Properties(); - } - properties.setGroupId(value); - continue; - } else if (key.equals("JMSXGroupSeq")) { - UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key)); - if (properties == null) { - properties = new Properties(); - } - properties.setGroupSequence(value); - continue; - } - } else if (key.startsWith(JMS_AMQP_PREFIX)) { - // AMQP Message Information stored from a conversion to the Core Message - if (key.equals(JMS_AMQP_MESSAGE_FORMAT)) { - messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT); - continue; - } else if (key.equals(JMS_AMQP_NATIVE)) { - // skip..internal use only - continue; - } else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) { - // skip..internal use only - continue; - } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) { - if (header == null) { - header = new Header(); - } - header.setFirstAcquirer(message.getBooleanProperty(key)); - continue; - } else if (key.equals(JMS_AMQP_HEADER)) { - if (header == null) { - header = new Header(); - } - continue; - } else if (key.equals(JMS_AMQP_HEADER_DURABLE)) { - if (header == null) { - header = new Header(); - } - header.setDurable(message.getInnerMessage().isDurable()); - continue; - } else if (key.equals(JMS_AMQP_HEADER_PRIORITY)) { - if (header == null) { - header = new Header(); - } - header.setPriority(UnsignedByte.valueOf(priority)); - continue; - } else if (key.startsWith(JMS_AMQP_PROPERTIES)) { - if (properties == null) { - properties = new Properties(); - } - continue; - } else if (key.startsWith(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX)) { - if (daMap == null) { - daMap = new HashMap<>(); - } - String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length()); - daMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); - continue; - } else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) { - if (maMap == null) { - maMap = new HashMap<>(); - } - String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length()); - maMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); - continue; - } else if (key.equals(JMS_AMQP_CONTENT_TYPE)) { - if (properties == null) { - properties = new Properties(); - } - properties.setContentType(Symbol.getSymbol(message.getStringProperty(key))); - continue; - } else if (key.equals(JMS_AMQP_CONTENT_ENCODING)) { - if (properties == null) { - properties = new Properties(); - } - properties.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key))); - continue; - } else if (key.equals(JMS_AMQP_REPLYTO_GROUP_ID)) { - if (properties == null) { - properties = new Properties(); - } - properties.setReplyToGroupId(message.getStringProperty(key)); - continue; - } else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) { - if (footerMap == null) { - footerMap = new HashMap<>(); - } - String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length()); - footerMap.put(name, message.getObjectProperty(key)); - continue; - } - } else if (key.equals("_AMQ_GROUP_ID")) { - String value = message.getStringProperty(key); - if (properties == null) { - properties = new Properties(); - } - properties.setGroupId(value); - continue; - } else if (key.equals(NATIVE_MESSAGE_ID)) { - // skip..internal use only - continue; - } else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) { - // skip..remove annotation from previous inbound transformation - continue; - } else if (key.equals(AMQPMessageTypes.AMQP_TYPE_KEY)) { - // skip..internal use only - TODO - Remove this deprecated value in future release. - continue; - } - - if (apMap == null) { - apMap = new HashMap<>(); - } - - Object objectProperty = message.getObjectProperty(key); - if (objectProperty instanceof byte[]) { - objectProperty = new Binary((byte[]) objectProperty); - } - - apMap.put(key, objectProperty); - } - - EncoderImpl encoder = TLSEncode.getEncoder(); - encoder.setByteBuffer(buffer); - - if (header != null) { - encoder.writeObject(header); - } - if (daMap != null) { - encoder.writeObject(new DeliveryAnnotations(daMap)); - } - if (maMap != null) { - encoder.writeObject(new MessageAnnotations(maMap)); - } - if (properties != null) { - encoder.writeObject(properties); - } - if (apMap != null) { - encoder.writeObject(new ApplicationProperties(apMap)); - } - if (body != null) { - encoder.writeObject(body); - } - if (footerMap != null) { - encoder.writeObject(new Footer(footerMap)); - } - - return messageFormat; - } - - private Section convertBody(ServerJMSMessage message) throws JMSException { - - Section body = null; - short orignalEncoding = AMQP_UNKNOWN; - - try { - orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING); - } catch (Exception ex) { - // Ignore and stick with UNKNOWN - } - - if (message instanceof ServerJMSBytesMessage) { - Binary payload = getBinaryFromMessageBody((ServerJMSBytesMessage) message); - - if (payload == null) { - payload = EMPTY_BINARY; - } - - switch (orignalEncoding) { - case AMQP_NULL: - break; - case AMQP_VALUE_BINARY: - body = new AmqpValue(payload); - break; - case AMQP_DATA: - case AMQP_UNKNOWN: - default: - body = new Data(payload); - break; - } - } else if (message instanceof ServerJMSTextMessage) { - switch (orignalEncoding) { - case AMQP_NULL: - break; - case AMQP_DATA: - body = new Data(getBinaryFromMessageBody((ServerJMSTextMessage) message)); - break; - case AMQP_VALUE_STRING: - case AMQP_UNKNOWN: - default: - body = new AmqpValue(((TextMessage) message).getText()); - break; - } - } else if (message instanceof ServerJMSMapMessage) { - body = new AmqpValue(getMapFromMessageBody((ServerJMSMapMessage) message)); - } else if (message instanceof ServerJMSStreamMessage) { - ArrayList list = new ArrayList<>(); - final ServerJMSStreamMessage m = (ServerJMSStreamMessage) message; - try { - while (true) { - list.add(m.readObject()); - } - } catch (MessageEOFException e) { - } - - // Deprecated encoding markers - TODO - Remove on future release - if (orignalEncoding == AMQP_UNKNOWN) { - String amqpType = message.getStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY); - if (amqpType != null) { - if (amqpType.equals(AMQPMessageTypes.AMQP_LIST)) { - orignalEncoding = AMQP_VALUE_LIST; - } else { - orignalEncoding = AMQP_SEQUENCE; - } - } - } - - switch (orignalEncoding) { - case AMQP_SEQUENCE: - body = new AmqpSequence(list); - break; - case AMQP_VALUE_LIST: - case AMQP_UNKNOWN: - default: - body = new AmqpValue(list); - break; - } - } else if (message instanceof ServerJMSObjectMessage) { - Binary payload = getBinaryFromMessageBody((ServerJMSObjectMessage) message); - - if (payload == null) { - payload = EMPTY_BINARY; - } - - switch (orignalEncoding) { - case AMQP_VALUE_BINARY: - body = new AmqpValue(payload); - break; - case AMQP_DATA: - case AMQP_UNKNOWN: - default: - body = new Data(payload); - break; - } - - // For a non-AMQP message we tag the outbound content type as containing - // a serialized Java object so that an AMQP client has a hint as to what - // we are sending it. - if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) { - message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); - } - } else if (message instanceof ServerJMSMessage) { - // If this is not an AMQP message that was converted then the original encoding - // will be unknown so we check for special cases of messages with special data - // encoded into the server message body. - if (orignalEncoding == AMQP_UNKNOWN) { - org.apache.activemq.artemis.api.core.Message internalMessage = message.getInnerMessage(); - int readerIndex = internalMessage.getBodyBuffer().readerIndex(); - try { - Object s = internalMessage.getBodyBuffer().readNullableSimpleString(); - if (s != null) { - body = new AmqpValue(s.toString()); - } - } catch (Throwable ignored) { - logger.debug("Exception ignored during conversion, should be ok!", ignored.getMessage(), ignored); - } finally { - internalMessage.getBodyBuffer().readerIndex(readerIndex); - } - } - } - - return body; - } - - private Binary getBinaryFromMessageBody(ServerJMSBytesMessage message) throws JMSException { - byte[] data = new byte[(int) message.getBodyLength()]; - message.readBytes(data); - message.reset(); // Need to reset after readBytes or future readBytes - - return new Binary(data); - } - - private Binary getBinaryFromMessageBody(ServerJMSTextMessage message) throws JMSException { - Binary result = null; - String text = message.getText(); - if (text != null) { - result = new Binary(text.getBytes(StandardCharsets.UTF_8)); - } - - return result; - } - - private Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException { - message.getInnerMessage().getBodyBuffer().resetReaderIndex(); - int size = message.getInnerMessage().getBodyBuffer().readInt(); - byte[] bytes = new byte[size]; - message.getInnerMessage().getBodyBuffer().readBytes(bytes); - - return new Binary(bytes); - } - - private Map getMapFromMessageBody(ServerJMSMapMessage message) throws JMSException { - final HashMap map = new LinkedHashMap<>(); - - @SuppressWarnings("unchecked") - final Enumeration names = message.getMapNames(); - while (names.hasMoreElements()) { - String key = names.nextElement(); - Object value = message.getObject(key); - if (value instanceof byte[]) { - value = new Binary((byte[]) value); - } - map.put(key, value); - } - - return map; - } - - private static byte destinationType(Destination destination) { - if (destination instanceof Queue) { - if (destination instanceof TemporaryQueue) { - return TEMP_QUEUE_TYPE; - } else { - return QUEUE_TYPE; - } - } else if (destination instanceof Topic) { - if (destination instanceof TemporaryTopic) { - return TEMP_TOPIC_TYPE; - } else { - return TOPIC_TYPE; - } - } - - throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java deleted file mode 100644 index 5113513..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import java.io.UnsupportedEncodingException; - -import javax.jms.JMSException; - -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.qpid.proton.codec.WritableBuffer; - -public abstract class OutboundTransformer { - - protected IDGenerator idGenerator; - - public OutboundTransformer(IDGenerator idGenerator) { - this.idGenerator = idGenerator; - } - - /** - * Given an JMS Message perform a conversion to an AMQP Message and encode into a form that - * is ready for transmission. - * - * @param message - * the message to transform - * @param buffer - * the buffer where encoding should write to - * - * @return the message format key of the encoded message. - * - * @throws JMSException - * if an error occurs during message transformation - * @throws UnsupportedEncodingException - * if an error occurs during message encoding - */ - public abstract long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException; - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 15611c3..1bb15b3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -21,16 +21,19 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress; import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.QueueQueryResult; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; @@ -38,9 +41,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternal import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; -import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore; -import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; import org.apache.qpid.proton.amqp.DescribedType; @@ -63,9 +64,6 @@ import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Sender; import org.jboss.logging.Logger; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; - /** * TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links */ @@ -568,7 +566,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr /** * handle an out going message from ActiveMQ Artemis, send via the Proton Sender */ - public int deliverMessage(Message message, int deliveryCount) throws Exception { + public int deliverMessage(AMQPMessage message, int deliveryCount) throws Exception { if (closed) { return 0; } @@ -592,21 +590,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); try { - long messageFormat = 0; - - - if (message instanceof AMQPMessage) { - message.sendBuffer(nettyBuffer, deliveryCount); - } else { - // Encode the Server Message into the given Netty Buffer as an AMQP - // Message transformed from the internal message model. - try { - messageFormat = sessionSPI.encodeMessage(message, deliveryCount, new NettyWritable(nettyBuffer)); - } catch (Throwable e) { - log.warn(e.getMessage(), e); - throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); - } - } + message.sendBuffer(nettyBuffer, deliveryCount); int size = nettyBuffer.writerIndex(); @@ -616,7 +600,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } final Delivery delivery; delivery = sender.delivery(tag, 0, tag.length); - delivery.setMessageFormat((int) messageFormat); + delivery.setMessageFormat((int) message.getMessageFormat()); delivery.setContext(message); // this will avoid a copy.. patch provided by Norman using buffer.array() http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java index 9a333c7..f132a45 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java @@ -16,42 +16,28 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage; -import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; -import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.message.Message; -import org.apache.qpid.proton.message.ProtonJMessage; import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.Assert; import org.junit.Test; -import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE; -import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE; -import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE; -import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.wrapMessage; public class TestConversions extends Assert { @@ -70,18 +56,12 @@ public class TestConversions extends Assert { message.setBody(new AmqpValue(new Boolean(true))); - AMQPMessage encodedMessage = new AMQPMessage(message, null); + AMQPMessage encodedMessage = new AMQPMessage(message); - ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage); + ICoreMessage serverMessage = encodedMessage.toCore(); - verifyProperties(new ServerJMSMessage(serverMessage, 0)); + verifyProperties(ServerJMSMessage.wrapCoreMessage(serverMessage)); - EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0); - Message amqpMessage = encoded.decode(); - - AmqpValue value = (AmqpValue) amqpMessage.getBody(); - assertEquals(value.getValue(), true); } @Test @@ -99,12 +79,11 @@ public class TestConversions extends Assert { message.setBody(new Data(new Binary(bodyBytes))); - AMQPMessage encodedMessage = new AMQPMessage(message, null); + AMQPMessage encodedMessage = new AMQPMessage(message); - ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage); + ICoreMessage serverMessage = encodedMessage.toCore(); - ServerJMSBytesMessage bytesMessage = (ServerJMSBytesMessage) wrapMessage(BYTES_TYPE, serverMessage, 0); + ServerJMSBytesMessage bytesMessage = (ServerJMSBytesMessage) ServerJMSMessage.wrapCoreMessage(serverMessage); verifyProperties(bytesMessage); @@ -116,9 +95,6 @@ public class TestConversions extends Assert { Assert.assertArrayEquals(bodyBytes, newBodyBytes); - Object obj = converter.outbound(serverMessage, 0); - - System.out.println("output = " + obj); } private void verifyProperties(javax.jms.Message message) throws Exception { @@ -149,28 +125,17 @@ public class TestConversions extends Assert { message.setBody(new AmqpValue(mapValues)); - AMQPMessage encodedMessage = new AMQPMessage(message, null); + AMQPMessage encodedMessage = new AMQPMessage(message); - ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage); + ICoreMessage serverMessage = encodedMessage.toCore(); - ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) wrapMessage(MAP_TYPE, serverMessage, 0); + ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) ServerJMSMessage.wrapCoreMessage(serverMessage); mapMessage.decode(); verifyProperties(mapMessage); Assert.assertEquals(1, mapMessage.getInt("someint")); Assert.assertEquals("value", mapMessage.getString("somestr")); - - EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0); - Message amqpMessage = encoded.decode(); - - AmqpValue value = (AmqpValue) amqpMessage.getBody(); - Map mapoutput = (Map) value.getValue(); - - assertEquals(Integer.valueOf(1), mapoutput.get("someint")); - - System.out.println("output = " + amqpMessage); } @Test @@ -186,12 +151,11 @@ public class TestConversions extends Assert { message.setBody(new AmqpSequence(objects)); - AMQPMessage encodedMessage = new AMQPMessage(message, null); + AMQPMessage encodedMessage = new AMQPMessage(message); - ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage); + ICoreMessage serverMessage = encodedMessage.toCore(); - ServerJMSStreamMessage streamMessage = (ServerJMSStreamMessage) wrapMessage(STREAM_TYPE, serverMessage, 0); + ServerJMSStreamMessage streamMessage = (ServerJMSStreamMessage) ServerJMSMessage.wrapCoreMessage(serverMessage); verifyProperties(streamMessage); @@ -199,13 +163,6 @@ public class TestConversions extends Assert { assertEquals(10, streamMessage.readInt()); assertEquals("10", streamMessage.readString()); - - EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0); - Message amqpMessage = encoded.decode(); - - List list = ((AmqpSequence) amqpMessage.getBody()).getValue(); - Assert.assertEquals(Integer.valueOf(10), list.get(0)); - Assert.assertEquals("10", list.get(1)); } @Test @@ -218,541 +175,17 @@ public class TestConversions extends Assert { String text = "someText"; message.setBody(new AmqpValue(text)); - AMQPMessage encodedMessage = new AMQPMessage(message, null); + AMQPMessage encodedMessage = new AMQPMessage(message); - ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage); + ICoreMessage serverMessage = encodedMessage.toCore(); - ServerJMSTextMessage textMessage = (ServerJMSTextMessage) wrapMessage(TEXT_TYPE, serverMessage, 0); + ServerJMSTextMessage textMessage = (ServerJMSTextMessage) ServerJMSMessage.wrapCoreMessage(serverMessage); textMessage.decode(); verifyProperties(textMessage); Assert.assertEquals(text, textMessage.getText()); - EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0); - Message amqpMessage = encoded.decode(); - - AmqpValue value = (AmqpValue) amqpMessage.getBody(); - String textValue = (String) value.getValue(); - - Assert.assertEquals(text, textValue); - - System.out.println("output = " + amqpMessage); - } - - private ProtonJMessage reEncodeMsg(Object obj) { - ProtonJMessage objOut = (ProtonJMessage) obj; - - ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); - - objOut.encode(new NettyWritable(nettyBuffer)); - return objOut; } - class EmptyBuffer implements ActiveMQBuffer { - - @Override - public ByteBuf byteBuf() { - return null; - } - - @Override - public int capacity() { - return 0; - } - - @Override - public int readerIndex() { - return 0; - } - - @Override - public void readerIndex(int readerIndex) { - - } - - @Override - public int writerIndex() { - return 0; - } - - @Override - public void writerIndex(int writerIndex) { - - } - - @Override - public void setIndex(int readerIndex, int writerIndex) { - - } - - @Override - public int readableBytes() { - return 0; - } - - @Override - public int writableBytes() { - return 0; - } - - @Override - public boolean readable() { - return false; - } - - @Override - public boolean writable() { - return false; - } - - @Override - public void clear() { - - } - - @Override - public void markReaderIndex() { - - } - - @Override - public void resetReaderIndex() { - - } - - @Override - public void markWriterIndex() { - - } - - @Override - public void resetWriterIndex() { - - } - - @Override - public void discardReadBytes() { - - } - - @Override - public byte getByte(int index) { - return 0; - } - - @Override - public short getUnsignedByte(int index) { - return 0; - } - - @Override - public short getShort(int index) { - return 0; - } - - @Override - public int getUnsignedShort(int index) { - return 0; - } - - @Override - public int getInt(int index) { - return 0; - } - - @Override - public long getUnsignedInt(int index) { - return 0; - } - - @Override - public long getLong(int index) { - return 0; - } - - @Override - public void getBytes(int index, ActiveMQBuffer dst) { - - } - - @Override - public void getBytes(int index, ActiveMQBuffer dst, int length) { - - } - - @Override - public void getBytes(int index, ActiveMQBuffer dst, int dstIndex, int length) { - - } - - @Override - public void getBytes(int index, byte[] dst) { - - } - - @Override - public void getBytes(int index, byte[] dst, int dstIndex, int length) { - - } - - @Override - public void getBytes(int index, ByteBuffer dst) { - - } - - @Override - public char getChar(int index) { - return 0; - } - - @Override - public float getFloat(int index) { - return 0; - } - - @Override - public double getDouble(int index) { - return 0; - } - - @Override - public void setByte(int index, byte value) { - - } - - @Override - public void setShort(int index, short value) { - - } - - @Override - public void setInt(int index, int value) { - - } - - @Override - public void setLong(int index, long value) { - - } - - @Override - public void setBytes(int index, ActiveMQBuffer src) { - - } - - @Override - public void setBytes(int index, ActiveMQBuffer src, int length) { - - } - - @Override - public void setBytes(int index, ActiveMQBuffer src, int srcIndex, int length) { - - } - - @Override - public void setBytes(int index, byte[] src) { - - } - - @Override - public void setBytes(int index, byte[] src, int srcIndex, int length) { - - } - - @Override - public void setBytes(int index, ByteBuffer src) { - - } - - @Override - public void setChar(int index, char value) { - - } - - @Override - public void setFloat(int index, float value) { - - } - - @Override - public void setDouble(int index, double value) { - - } - - @Override - public byte readByte() { - return 0; - } - - @Override - public int readUnsignedByte() { - return 0; - } - - @Override - public short readShort() { - return 0; - } - - @Override - public int readUnsignedShort() { - return 0; - } - - @Override - public int readInt() { - return 0; - } - - @Override - public long readUnsignedInt() { - return 0; - } - - @Override - public long readLong() { - return 0; - } - - @Override - public char readChar() { - return 0; - } - - @Override - public float readFloat() { - return 0; - } - - @Override - public double readDouble() { - return 0; - } - - @Override - public boolean readBoolean() { - return false; - } - - @Override - public SimpleString readNullableSimpleString() { - return null; - } - - @Override - public String readNullableString() { - return null; - } - - @Override - public SimpleString readSimpleString() { - return null; - } - - @Override - public String readString() { - return null; - } - - @Override - public String readUTF() { - return null; - } - - @Override - public ActiveMQBuffer readBytes(int length) { - return null; - } - - @Override - public ActiveMQBuffer readSlice(int length) { - return null; - } - - @Override - public void readBytes(ActiveMQBuffer dst) { - - } - - @Override - public void readBytes(ActiveMQBuffer dst, int length) { - - } - - @Override - public void readBytes(ActiveMQBuffer dst, int dstIndex, int length) { - - } - - @Override - public void readBytes(byte[] dst) { - - } - - @Override - public void readBytes(byte[] dst, int dstIndex, int length) { - - } - - @Override - public void readBytes(ByteBuffer dst) { - - } - - @Override - public int skipBytes(int length) { - return length; - } - - @Override - public void writeByte(byte value) { - - } - - @Override - public void writeShort(short value) { - - } - - @Override - public void writeInt(int value) { - - } - - @Override - public void writeLong(long value) { - - } - - @Override - public void writeChar(char chr) { - - } - - @Override - public void writeFloat(float value) { - - } - - @Override - public void writeDouble(double value) { - - } - - @Override - public void writeBoolean(boolean val) { - - } - - @Override - public void writeNullableSimpleString(SimpleString val) { - - } - - @Override - public void writeNullableString(String val) { - - } - - @Override - public void writeSimpleString(SimpleString val) { - - } - - @Override - public void writeString(String val) { - - } - - @Override - public void writeUTF(String utf) { - - } - - @Override - public void writeBytes(ActiveMQBuffer src, int length) { - - } - - @Override - public void writeBytes(ActiveMQBuffer src, int srcIndex, int length) { - - } - - @Override - public void writeBytes(byte[] src) { - - } - - @Override - public void writeBytes(byte[] src, int srcIndex, int length) { - - } - - @Override - public void writeBytes(ByteBuffer src) { - - } - - @Override - public void readFully(byte[] b) throws IOException { - } - - @Override - public void readFully(byte[] b, int off, int len) throws IOException { - } - - @Override - public String readLine() throws IOException { - return null; - } - - @Override - public ActiveMQBuffer copy() { - return null; - } - - @Override - public ActiveMQBuffer copy(int index, int length) { - return null; - } - - @Override - public ActiveMQBuffer slice() { - return null; - } - - @Override - public ActiveMQBuffer slice(int index, int length) { - return null; - } - - @Override - public ActiveMQBuffer duplicate() { - return null; - } - - @Override - public ByteBuffer toByteBuffer() { - return null; - } - - @Override - public ByteBuffer toByteBuffer(int index, int length) { - return null; - } - - @Override - public void release() { - //no-op - } - - @Override - public void writeBytes(ByteBuf src, int srcIndex, int length) { - - } - } }