Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B3A57200C2F for ; Mon, 6 Mar 2017 12:53:53 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B24D6160B76; Mon, 6 Mar 2017 11:53:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BBB2A160B91 for ; Mon, 6 Mar 2017 12:53:50 +0100 (CET) Received: (qmail 43410 invoked by uid 500); 6 Mar 2017 11:53:49 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 42449 invoked by uid 99); 6 Mar 2017 11:53:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Mar 2017 11:53:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DE680E008F; Mon, 6 Mar 2017 11:53:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: martyntaylor@apache.org To: commits@activemq.apache.org Date: Mon, 06 Mar 2017 11:54:03 -0000 Message-Id: In-Reply-To: <49415fef219940f3b0d08a6a8ade4334@git.apache.org> References: <49415fef219940f3b0d08a6a8ade4334@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/22] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding. archived-at: Mon, 06 Mar 2017 11:53:53 -0000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java new file mode 100644 index 0000000..030a7a0 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.converter; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; +import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Decimal128; +import org.apache.qpid.proton.amqp.Decimal32; +import org.apache.qpid.proton.amqp.Decimal64; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedByte; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.UnsignedLong; +import org.apache.qpid.proton.amqp.UnsignedShort; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Footer; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.codec.WritableBuffer; + +import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createBytesMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMapMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createObjectMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createStreamMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createTextMessage; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.isContentType; + +/** + * This class was created just to separate concerns on AMQPConverter. + * For better organization of the code. + * */ +public class AmqpCoreConverter { + + public static ICoreMessage toCore(AMQPMessage message) throws Exception { + + Section body = message.getProtonMessage().getBody(); + ServerJMSMessage result; + + if (body == null) { + if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) { + result = createObjectMessage(message.getMessageID()); + } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) { + result = createBytesMessage(message.getMessageID()); + } else { + Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType()); + if (charset != null) { + result = createTextMessage(message.getMessageID()); + } else { + result = createMessage(message.getMessageID()); + } + } + } else if (body instanceof Data) { + Binary payload = ((Data) body).getValue(); + + if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) { + result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) { + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } else { + Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType()); + if (StandardCharsets.UTF_8.equals(charset)) { + ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength()); + + try { + CharBuffer chars = charset.newDecoder().decode(buf); + result = createTextMessage(message.getMessageID(), String.valueOf(chars)); + } catch (CharacterCodingException e) { + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } + } else { + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } + } + + } else if (body instanceof AmqpSequence) { + AmqpSequence sequence = (AmqpSequence) body; + ServerJMSStreamMessage m = createStreamMessage(message.getMessageID()); + for (Object item : sequence.getValue()) { + m.writeObject(item); + } + + result = m; + } else if (body instanceof AmqpValue) { + Object value = ((AmqpValue) body).getValue(); + if (value == null || value instanceof String) { + result = createTextMessage(message.getMessageID(), (String) value); + + } else if (value instanceof Binary) { + Binary payload = (Binary) value; + + if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) { + result = createObjectMessage(message.getMessageID(), payload); + } else { + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + } + + } else if (value instanceof List) { + ServerJMSStreamMessage m = createStreamMessage(message.getMessageID()); + for (Object item : (List) value) { + m.writeObject(item); + } + result = m; + } else if (value instanceof Map) { + result = createMapMessage(message.getMessageID(), (Map) value); + } else { + ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); + try { + TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf)); + TLSEncode.getEncoder().writeObject(body); + result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex()); + } finally { + buf.release(); + TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null); + } + } + } else { + throw new RuntimeException("Unexpected body type: " + body.getClass()); + } + + populateMessage(result, message.getProtonMessage()); + result.getInnerMessage().setReplyTo(message.getReplyTo()); + + result.encode(); + + return result != null ? result.getInnerMessage() : null; + } + + protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception { + Header header = amqp.getHeader(); + if (header != null) { + jms.setBooleanProperty(JMS_AMQP_HEADER, true); + + if (header.getDurable() != null) { + jms.setBooleanProperty(JMS_AMQP_HEADER_DURABLE, true); + jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + } else { + jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); + } + + if (header.getPriority() != null) { + jms.setBooleanProperty(JMS_AMQP_HEADER_PRIORITY, true); + jms.setJMSPriority(header.getPriority().intValue()); + } else { + jms.setJMSPriority(javax.jms.Message.DEFAULT_PRIORITY); + } + + if (header.getFirstAcquirer() != null) { + jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer()); + } + + if (header.getDeliveryCount() != null) { + // AMQP Delivery Count counts only failed delivers where JMS + // Delivery Count should include the original delivery in the count. + jms.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue() + 1); + } + } else { + jms.setJMSPriority((byte) javax.jms.Message.DEFAULT_PRIORITY); + jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); + } + + final MessageAnnotations ma = amqp.getMessageAnnotations(); + if (ma != null) { + for (Map.Entry entry : ma.getValue().entrySet()) { + String key = entry.getKey().toString(); + if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { + long deliveryTime = ((Number) entry.getValue()).longValue(); + jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime); + } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { + long delay = ((Number) entry.getValue()).longValue(); + if (delay > 0) { + jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay); + } + } + + setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue()); + } + } + + final ApplicationProperties ap = amqp.getApplicationProperties(); + if (ap != null) { + for (Map.Entry entry : (Set>) ap.getValue().entrySet()) { + setProperty(jms, entry.getKey().toString(), entry.getValue()); + } + } + + final Properties properties = amqp.getProperties(); + if (properties != null) { + if (properties.getMessageId() != null) { + jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId())); + } + Binary userId = properties.getUserId(); + if (userId != null) { + // TODO - Better Way to set this? + jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8)); + } + if (properties.getTo() != null) { + jms.setJMSDestination(new ServerDestination(properties.getTo())); + } + if (properties.getSubject() != null) { + jms.setJMSType(properties.getSubject()); + } + if (properties.getReplyTo() != null) { + jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo())); + } + if (properties.getCorrelationId() != null) { + jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId())); + } + if (properties.getContentType() != null) { + jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString()); + } + if (properties.getContentEncoding() != null) { + jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString()); + } + if (properties.getCreationTime() != null) { + jms.setJMSTimestamp(properties.getCreationTime().getTime()); + } + if (properties.getGroupId() != null) { + jms.setStringProperty("_AMQ_GROUP_ID", properties.getGroupId()); + } + if (properties.getGroupSequence() != null) { + jms.setIntProperty("JMSXGroupSeq", properties.getGroupSequence().intValue()); + } + if (properties.getReplyToGroupId() != null) { + jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId()); + } + if (properties.getAbsoluteExpiryTime() != null) { + jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime()); + } + } + + // If the jms expiration has not yet been set... + if (header != null && jms.getJMSExpiration() == 0) { + // Then lets try to set it based on the message ttl. + long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE; + if (header.getTtl() != null) { + ttl = header.getTtl().longValue(); + } + + if (ttl == 0) { + jms.setJMSExpiration(0); + } else { + jms.setJMSExpiration(System.currentTimeMillis() + ttl); + } + } + + final Footer fp = amqp.getFooter(); + if (fp != null) { + for (Map.Entry entry : (Set>) fp.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue()); + } + } + + return jms; + } + + private static void setProperty(javax.jms.Message msg, String key, Object value) throws JMSException { + if (value instanceof UnsignedLong) { + long v = ((UnsignedLong) value).longValue(); + msg.setLongProperty(key, v); + } else if (value instanceof UnsignedInteger) { + long v = ((UnsignedInteger) value).longValue(); + if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) { + msg.setIntProperty(key, (int) v); + } else { + msg.setLongProperty(key, v); + } + } else if (value instanceof UnsignedShort) { + int v = ((UnsignedShort) value).intValue(); + if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) { + msg.setShortProperty(key, (short) v); + } else { + msg.setIntProperty(key, v); + } + } else if (value instanceof UnsignedByte) { + short v = ((UnsignedByte) value).shortValue(); + if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) { + msg.setByteProperty(key, (byte) v); + } else { + msg.setShortProperty(key, v); + } + } else if (value instanceof Symbol) { + msg.setStringProperty(key, value.toString()); + } else if (value instanceof Decimal128) { + msg.setDoubleProperty(key, ((Decimal128) value).doubleValue()); + } else if (value instanceof Decimal64) { + msg.setDoubleProperty(key, ((Decimal64) value).doubleValue()); + } else if (value instanceof Decimal32) { + msg.setFloatProperty(key, ((Decimal32) value).floatValue()); + } else if (value instanceof Binary) { + msg.setStringProperty(key, value.toString()); + } else { + msg.setObjectProperty(key, value); + } + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java new file mode 100644 index 0000000..111de8c --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java @@ -0,0 +1,461 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.converter; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.Queue; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Date; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; +import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; +import org.apache.activemq.artemis.reader.MessageUtil; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedByte; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Footer; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.codec.EncoderImpl; +import org.apache.qpid.proton.codec.WritableBuffer; +import org.jboss.logging.Logger; + +import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID; +import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.EMPTY_BINARY; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_NATIVE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_PROPERTIES; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.QUEUE_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_QUEUE_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_TOPIC_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TOPIC_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.toAddress; + +public class CoreAmqpConverter { + + private static Logger logger = Logger.getLogger(CoreAmqpConverter.class); + + public static AMQPMessage checkAMQP(Message message) throws Exception { + if (message instanceof AMQPMessage) { + return (AMQPMessage)message; + } else { + // It will first convert to Core, then to AMQP + return fromCore(message.toCore()); + } + } + + public static AMQPMessage fromCore(ICoreMessage coreMessage) throws Exception { + if (coreMessage == null) { + return null; + } + + ServerJMSMessage message = ServerJMSMessage.wrapCoreMessage(coreMessage); + message.decode(); + + long messageFormat = 0; + Header header = null; + final Properties properties = new Properties(); + Map daMap = null; + final Map maMap = new HashMap<>(); + Map apMap = null; + Map footerMap = null; + + Section body = convertBody(message, maMap, properties); + + if (message.getInnerMessage().isDurable()) { + if (header == null) { + header = new Header(); + } + header.setDurable(true); + } + byte priority = (byte) message.getJMSPriority(); + if (priority != javax.jms.Message.DEFAULT_PRIORITY) { + if (header == null) { + header = new Header(); + } + header.setPriority(UnsignedByte.valueOf(priority)); + } + String type = message.getJMSType(); + if (type != null) { + properties.setSubject(type); + } + String messageId = message.getJMSMessageID(); + if (messageId != null) { + try { + properties.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(messageId)); + } catch (ActiveMQAMQPIllegalStateException e) { + properties.setMessageId(messageId); + } + } + Destination destination = message.getJMSDestination(); + if (destination != null) { + properties.setTo(toAddress(destination)); + maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination)); + } + Destination replyTo = message.getJMSReplyTo(); + if (replyTo != null) { + properties.setReplyTo(toAddress(replyTo)); + maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo)); + } + String correlationId = message.getJMSCorrelationID(); + if (correlationId != null) { + try { + properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId)); + } catch (ActiveMQAMQPIllegalStateException e) { + properties.setCorrelationId(correlationId); + } + } + long expiration = message.getJMSExpiration(); + if (expiration != 0) { + long ttl = expiration - System.currentTimeMillis(); + if (ttl < 0) { + ttl = 1; + } + + if (header == null) { + header = new Header(); + } + header.setTtl(new UnsignedInteger((int) ttl)); + + properties.setAbsoluteExpiryTime(new Date(expiration)); + } + long timeStamp = message.getJMSTimestamp(); + if (timeStamp != 0) { + properties.setCreationTime(new Date(timeStamp)); + } + + final Set keySet = MessageUtil.getPropertyNames(message.getInnerMessage()); + for (String key : keySet) { + if (key.startsWith("JMSX")) { + if (key.equals("JMSXUserID")) { + String value = message.getStringProperty(key); + properties.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8))); + continue; + } else if (key.equals("JMSXGroupID")) { + String value = message.getStringProperty(key); + properties.setGroupId(value); + continue; + } else if (key.equals("JMSXGroupSeq")) { + UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key)); + properties.setGroupSequence(value); + continue; + } + } else if (key.startsWith(JMS_AMQP_PREFIX)) { + // AMQP Message Information stored from a conversion to the Core Message + if (key.equals(JMS_AMQP_NATIVE)) { + // skip..internal use only + continue; + } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) { + if (header == null) { + header = new Header(); + } + header.setFirstAcquirer(message.getBooleanProperty(key)); + continue; + } else if (key.equals(JMS_AMQP_HEADER)) { + if (header == null) { + header = new Header(); + } + continue; + } else if (key.equals(JMS_AMQP_HEADER_DURABLE)) { + if (header == null) { + header = new Header(); + } + header.setDurable(message.getInnerMessage().isDurable()); + continue; + } else if (key.equals(JMS_AMQP_HEADER_PRIORITY)) { + if (header == null) { + header = new Header(); + } + header.setPriority(UnsignedByte.valueOf(priority)); + continue; + } else if (key.startsWith(JMS_AMQP_PROPERTIES)) { + continue; + } else if (key.startsWith(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX)) { + if (daMap == null) { + daMap = new HashMap<>(); + } + String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length()); + daMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); + continue; + } else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) { + String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length()); + maMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); + continue; + } else if (key.equals(JMS_AMQP_CONTENT_TYPE)) { + properties.setContentType(Symbol.getSymbol(message.getStringProperty(key))); + continue; + } else if (key.equals(JMS_AMQP_CONTENT_ENCODING)) { + properties.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key))); + continue; + } else if (key.equals(JMS_AMQP_REPLYTO_GROUP_ID)) { + properties.setReplyToGroupId(message.getStringProperty(key)); + continue; + } else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) { + if (footerMap == null) { + footerMap = new HashMap<>(); + } + String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length()); + footerMap.put(name, message.getObjectProperty(key)); + continue; + } + } else if (key.equals("_AMQ_GROUP_ID")) { + String value = message.getStringProperty(key); + properties.setGroupId(value); + continue; + } else if (key.equals(NATIVE_MESSAGE_ID)) { + // skip..internal use only + continue; + } else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) { + // skip..remove annotation from previous inbound transformation + continue; + } + + if (apMap == null) { + apMap = new HashMap<>(); + } + + Object objectProperty = message.getObjectProperty(key); + if (objectProperty instanceof byte[]) { + objectProperty = new Binary((byte[]) objectProperty); + } + + apMap.put(key, objectProperty); + } + + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); + + try { + EncoderImpl encoder = TLSEncode.getEncoder(); + encoder.setByteBuffer(new NettyWritable(buffer)); + + if (header != null) { + encoder.writeObject(header); + } + if (daMap != null) { + encoder.writeObject(new DeliveryAnnotations(daMap)); + } + if (maMap != null) { + encoder.writeObject(new MessageAnnotations(maMap)); + } + if (properties != null) { + encoder.writeObject(properties); + } + if (apMap != null) { + encoder.writeObject(new ApplicationProperties(apMap)); + } + if (body != null) { + encoder.writeObject(body); + } + if (footerMap != null) { + encoder.writeObject(new Footer(footerMap)); + } + + byte[] data = new byte[buffer.writerIndex()]; + buffer.readBytes(data); + + AMQPMessage amqpMessage = new AMQPMessage(messageFormat, data); + amqpMessage.setMessageID(message.getInnerMessage().getMessageID()); + amqpMessage.setReplyTo(coreMessage.getReplyTo()); + return amqpMessage; + + } finally { + TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null); + buffer.release(); + } + } + + private static Section convertBody(ServerJMSMessage message, Map maMap, Properties properties) throws JMSException { + + Section body = null; + + if (message instanceof ServerJMSBytesMessage) { + Binary payload = getBinaryFromMessageBody((ServerJMSBytesMessage) message); + + maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_BYTES_MESSAGE); + if (payload == null) { + payload = EMPTY_BINARY; + } else { + body = new AmqpValue(payload); + } + } else if (message instanceof ServerJMSTextMessage) { + body = new AmqpValue(((TextMessage) message).getText()); + maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_TEXT_MESSAGE); + } else if (message instanceof ServerJMSMapMessage) { + body = new AmqpValue(getMapFromMessageBody((ServerJMSMapMessage) message)); + maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_MAP_MESSAGE); + } else if (message instanceof ServerJMSStreamMessage) { + maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_STREAM_MESSAGE); + ArrayList list = new ArrayList<>(); + final ServerJMSStreamMessage m = (ServerJMSStreamMessage) message; + try { + while (true) { + list.add(m.readObject()); + } + } catch (MessageEOFException e) { + } + + body = new AmqpSequence(list); + } else if (message instanceof ServerJMSObjectMessage) { + properties.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); + maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_OBJECT_MESSAGE); + Binary payload = getBinaryFromMessageBody((ServerJMSObjectMessage) message); + + if (payload == null) { + payload = EMPTY_BINARY; + } + + body = new Data(payload); + + // For a non-AMQP message we tag the outbound content type as containing + // a serialized Java object so that an AMQP client has a hint as to what + // we are sending it. + if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) { + message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString()); + } + } else if (message instanceof ServerJMSMessage) { + maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_MESSAGE); + // If this is not an AMQP message that was converted then the original encoding + // will be unknown so we check for special cases of messages with special data + // encoded into the server message body. + ICoreMessage internalMessage = message.getInnerMessage(); + int readerIndex = internalMessage.getBodyBuffer().readerIndex(); + try { + Object s = internalMessage.getBodyBuffer().readNullableSimpleString(); + if (s != null) { + body = new AmqpValue(s.toString()); + } + } catch (Throwable ignored) { + logger.debug("Exception ignored during conversion", ignored.getMessage(), ignored); + body = new AmqpValue("Conversion to AMQP error!"); + } finally { + internalMessage.getBodyBuffer().readerIndex(readerIndex); + } + } + + return body; + } + + private static Binary getBinaryFromMessageBody(ServerJMSBytesMessage message) throws JMSException { + byte[] data = new byte[(int) message.getBodyLength()]; + message.readBytes(data); + message.reset(); // Need to reset after readBytes or future readBytes + + return new Binary(data); + } + + private static Binary getBinaryFromMessageBody(ServerJMSTextMessage message) throws JMSException { + Binary result = null; + String text = message.getText(); + if (text != null) { + result = new Binary(text.getBytes(StandardCharsets.UTF_8)); + } + + return result; + } + + private static Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException { + message.getInnerMessage().getBodyBuffer().resetReaderIndex(); + int size = message.getInnerMessage().getBodyBuffer().readInt(); + byte[] bytes = new byte[size]; + message.getInnerMessage().getBodyBuffer().readBytes(bytes); + + return new Binary(bytes); + } + + private static Map getMapFromMessageBody(ServerJMSMapMessage message) throws JMSException { + final HashMap map = new LinkedHashMap<>(); + + @SuppressWarnings("unchecked") + final Enumeration names = message.getMapNames(); + while (names.hasMoreElements()) { + String key = names.nextElement(); + Object value = message.getObject(key); + if (value instanceof byte[]) { + value = new Binary((byte[]) value); + } + map.put(key, value); + } + + return map; + } + + private static byte destinationType(Destination destination) { + if (destination instanceof Queue) { + if (destination instanceof TemporaryQueue) { + return TEMP_QUEUE_TYPE; + } else { + return QUEUE_TYPE; + } + } else if (destination instanceof Topic) { + if (destination instanceof TemporaryTopic) { + return TEMP_TOPIC_TYPE; + } else { + return TOPIC_TYPE; + } + } + + throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java deleted file mode 100644 index 6aa44a4..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter; - -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE; - -import java.io.IOException; - -import javax.jms.BytesMessage; - -import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport; -import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPNativeOutboundTransformer; -import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.message.InboundTransformer; -import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingInboundTransformer; -import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingOutboundTransformer; -import org.apache.activemq.artemis.protocol.amqp.converter.message.OutboundTransformer; -import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; -import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.qpid.proton.codec.WritableBuffer; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - -public class ProtonMessageConverter implements MessageConverter { - - public ProtonMessageConverter(IDGenerator idGenerator) { - inboundTransformer = new JMSMappingInboundTransformer(idGenerator); - outboundTransformer = new JMSMappingOutboundTransformer(idGenerator); - } - - private final InboundTransformer inboundTransformer; - private final OutboundTransformer outboundTransformer; - - @Override - public ServerMessage inbound(Object messageSource) throws Exception { - EncodedMessage encodedMessageSource = (EncodedMessage) messageSource; - ServerJMSMessage transformedMessage = null; - - try { - transformedMessage = inboundTransformer.transform(encodedMessageSource); - } catch (Exception e) { - ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName()); - ActiveMQClientLogger.LOGGER.trace("Transformation error:", e); - - throw new IOException("Failed to transform incoming delivery, skipping."); - } - - transformedMessage.encode(); - - return (ServerMessage) transformedMessage.getInnerMessage(); - } - - @Override - public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception { - // Useful for testing but not recommended for real life use. - ByteBuf nettyBuffer = Unpooled.buffer(1024); - NettyWritable buffer = new NettyWritable(nettyBuffer); - long messageFormat = (long) outbound(messageOutbound, deliveryCount, buffer); - - EncodedMessage encoded = new EncodedMessage(messageFormat, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), - nettyBuffer.readableBytes()); - - return encoded; - } - - public Object outbound(ServerMessage messageOutbound, int deliveryCount, WritableBuffer buffer) throws Exception { - ServerJMSMessage jmsMessage = AMQPMessageSupport.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount); - - jmsMessage.decode(); - - if (jmsMessage.getBooleanProperty(JMS_AMQP_NATIVE)) { - if (jmsMessage instanceof BytesMessage) { - return AMQPNativeOutboundTransformer.transform(outboundTransformer, (ServerJMSBytesMessage) jmsMessage, buffer); - } else { - return 0; - } - } else { - return outboundTransformer.transform(jmsMessage, buffer); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java index abdf808..8d473a7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java @@ -19,8 +19,8 @@ package org.apache.activemq.artemis.protocol.amqp.converter.jms; import javax.jms.BytesMessage; import javax.jms.JMSException; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean; @@ -49,13 +49,13 @@ import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteUTF; public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMessage { - public ServerJMSBytesMessage(MessageInternal message, int deliveryCount) { - super(message, deliveryCount); + public ServerJMSBytesMessage(ICoreMessage message) { + super(message); } @Override public long getBodyLength() throws JMSException { - return message.getEndOfBodyPosition() - MessageImpl.BODY_OFFSET; + return message.getEndOfBodyPosition() - CoreMessage.BODY_OFFSET; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java index 0268065..f72239e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java @@ -25,9 +25,9 @@ import java.util.HashSet; import java.util.Set; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.utils.TypedProperties; import static org.apache.activemq.artemis.reader.MapMessageUtil.readBodyMap; @@ -52,8 +52,8 @@ public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMe /* * This constructor is used to construct messages prior to sending */ - public ServerJMSMapMessage(MessageInternal message, int deliveryCount) { - super(message, deliveryCount); + public ServerJMSMapMessage(ICoreMessage message) { + super(message); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java index f9a94f5..2a52f7a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java @@ -16,43 +16,56 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.jms; -import java.util.Collections; -import java.util.Enumeration; - import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; +import java.util.Collections; +import java.util.Enumeration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.reader.MessageUtil; import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID; +import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE; +import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE; +import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE; +import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE; +import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; public class ServerJMSMessage implements Message { - protected final MessageInternal message; - - protected int deliveryCount; - - public MessageInternal getInnerMessage() { - return message; - } + protected final ICoreMessage message; + private ActiveMQBuffer readBodyBuffer; - public ServerJMSMessage(MessageInternal message, int deliveryCount) { + public ServerJMSMessage(ICoreMessage message) { this.message = message; - this.deliveryCount = deliveryCount; } - public int getDeliveryCount() { - return deliveryCount; + public static ServerJMSMessage wrapCoreMessage(ICoreMessage wrapped) { + switch (wrapped.getType()) { + case STREAM_TYPE: + return new ServerJMSStreamMessage(wrapped); + case BYTES_TYPE: + return new ServerJMSBytesMessage(wrapped); + case MAP_TYPE: + return new ServerJMSMapMessage(wrapped); + case TEXT_TYPE: + return new ServerJMSTextMessage(wrapped); + case OBJECT_TYPE: + return new ServerJMSObjectMessage(wrapped); + default: + return new ServerJMSMessage(wrapped); + } } - private ActiveMQBuffer readBodyBuffer; + public ICoreMessage getInnerMessage() { + return message; + } /** * When reading we use a protected copy so multi-threads can work fine @@ -60,7 +73,7 @@ public class ServerJMSMessage implements Message { protected ActiveMQBuffer getReadBodyBuffer() { if (readBodyBuffer == null) { // to avoid clashes between multiple threads - readBodyBuffer = message.getBodyBufferDuplicate(); + readBodyBuffer = message.getReadOnlyBodyBuffer(); } return readBodyBuffer; } @@ -113,13 +126,13 @@ public class ServerJMSMessage implements Message { } @Override - public final void setJMSCorrelationID(String correlationID) throws JMSException { - MessageUtil.setJMSCorrelationID(message, correlationID); + public final String getJMSCorrelationID() throws JMSException { + return MessageUtil.getJMSCorrelationID(message); } @Override - public final String getJMSCorrelationID() throws JMSException { - return MessageUtil.getJMSCorrelationID(message); + public final void setJMSCorrelationID(String correlationID) throws JMSException { + MessageUtil.setJMSCorrelationID(message, correlationID); } @Override @@ -140,7 +153,7 @@ public class ServerJMSMessage implements Message { @Override public final Destination getJMSDestination() throws JMSException { - SimpleString sdest = message.getAddress(); + SimpleString sdest = message.getAddressSimpleString(); if (sdest == null) { return null; @@ -152,7 +165,7 @@ public class ServerJMSMessage implements Message { @Override public final void setJMSDestination(Destination destination) throws JMSException { if (destination == null) { - message.setAddress(null); + message.setAddress((SimpleString)null); } else { message.setAddress(((ActiveMQDestination) destination).getSimpleAddress()); } @@ -254,19 +267,11 @@ public class ServerJMSMessage implements Message { @Override public final int getIntProperty(String name) throws JMSException { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { - return deliveryCount; - } - return message.getIntProperty(name); } @Override public final long getLongProperty(String name) throws JMSException { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { - return deliveryCount; - } - return message.getLongProperty(name); } @@ -282,10 +287,6 @@ public class ServerJMSMessage implements Message { @Override public final String getStringProperty(String name) throws JMSException { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { - return String.valueOf(deliveryCount); - } - return message.getStringProperty(name); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java index d1eaac6..23ffb09 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java @@ -16,13 +16,12 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.jms; -import java.io.Serializable; - import javax.jms.JMSException; import javax.jms.ObjectMessage; +import java.io.Serializable; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.qpid.proton.amqp.Binary; public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMessage { @@ -31,8 +30,8 @@ public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMe private Binary payload; - public ServerJMSObjectMessage(MessageInternal message, int deliveryCount) { - super(message, deliveryCount); + public ServerJMSObjectMessage(ICoreMessage message) { + super(message); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java index a53fc0e..9aaf4c3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java @@ -21,9 +21,9 @@ import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; import javax.jms.StreamMessage; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.utils.DataConstants; import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBoolean; @@ -44,8 +44,8 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St private int bodyLength = 0; - public ServerJMSStreamMessage(MessageInternal message, int deliveryCount) { - super(message, deliveryCount); + public ServerJMSStreamMessage(ICoreMessage message) { + super(message); } // StreamMessage implementation ---------------------------------- @@ -180,7 +180,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St @Override public Object readObject() throws JMSException { - if (getReadBodyBuffer().readerIndex() >= message.getEndOfBodyPosition()) { + if (getReadBodyBuffer().readerIndex() >= getReadBodyBuffer().writerIndex()) { throw new MessageEOFException(""); } try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java index eb88de0..f770185 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java @@ -19,9 +19,9 @@ package org.apache.activemq.artemis.protocol.amqp.converter.jms; import javax.jms.JMSException; import javax.jms.TextMessage; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import static org.apache.activemq.artemis.reader.TextMessageUtil.readBodyText; import static org.apache.activemq.artemis.reader.TextMessageUtil.writeBodyText; @@ -49,8 +49,8 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag /* * This constructor is used to construct messages prior to sending */ - public ServerJMSTextMessage(MessageInternal message, int deliveryCount) { - super(message, deliveryCount); + public ServerJMSTextMessage(ICoreMessage message) { + super(message); } // TextMessage implementation ------------------------------------ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java deleted file mode 100644 index 01d72c8..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import java.nio.charset.Charset; -import java.nio.charset.IllegalCharsetNameException; -import java.nio.charset.StandardCharsets; -import java.nio.charset.UnsupportedCharsetException; -import java.util.StringTokenizer; - -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException; - -public final class AMQPContentTypeSupport { - - private static final String UTF_8 = "UTF-8"; - private static final String CHARSET = "charset"; - private static final String TEXT = "text"; - private static final String APPLICATION = "application"; - private static final String JAVASCRIPT = "javascript"; - private static final String XML = "xml"; - private static final String XML_VARIANT = "+xml"; - private static final String JSON = "json"; - private static final String JSON_VARIANT = "+json"; - private static final String XML_DTD = "xml-dtd"; - private static final String ECMASCRIPT = "ecmascript"; - - /** - * @param contentType - * the contentType of the received message - * @return the character set to use, or null if not to treat the message as text - * @throws ActiveMQAMQPInvalidContentTypeException - * if the content-type is invalid in some way. - */ - public static Charset parseContentTypeForTextualCharset(final String contentType) throws ActiveMQAMQPInvalidContentTypeException { - if (contentType == null || contentType.trim().isEmpty()) { - throw new ActiveMQAMQPInvalidContentTypeException("Content type can't be null or empty"); - } - - int subTypeSeparator = contentType.indexOf("/"); - if (subTypeSeparator == -1) { - throw new ActiveMQAMQPInvalidContentTypeException("Content type has no '/' separator: " + contentType); - } - - final String type = contentType.substring(0, subTypeSeparator).toLowerCase().trim(); - - String subTypePart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim(); - - String parameterPart = null; - int parameterSeparator = subTypePart.indexOf(";"); - if (parameterSeparator != -1) { - if (parameterSeparator < subTypePart.length() - 1) { - parameterPart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim(); - } - subTypePart = subTypePart.substring(0, parameterSeparator).trim(); - } - - if (subTypePart.isEmpty()) { - throw new ActiveMQAMQPInvalidContentTypeException("Content type has no subtype after '/'" + contentType); - } - - final String subType = subTypePart; - - if (isTextual(type, subType)) { - String charset = findCharset(parameterPart); - if (charset == null) { - charset = UTF_8; - } - - if (UTF_8.equals(charset)) { - return StandardCharsets.UTF_8; - } else { - try { - return Charset.forName(charset); - } catch (IllegalCharsetNameException icne) { - throw new ActiveMQAMQPInvalidContentTypeException("Illegal charset: " + charset); - } catch (UnsupportedCharsetException uce) { - throw new ActiveMQAMQPInvalidContentTypeException("Unsupported charset: " + charset); - } - } - } - - return null; - } - - // ----- Internal Content Type utilities ----------------------------------// - - private static boolean isTextual(String type, String subType) { - if (TEXT.equals(type)) { - return true; - } - - if (APPLICATION.equals(type)) { - if (XML.equals(subType) || JSON.equals(subType) || JAVASCRIPT.equals(subType) || subType.endsWith(XML_VARIANT) || subType.endsWith(JSON_VARIANT) - || XML_DTD.equals(subType) || ECMASCRIPT.equals(subType)) { - return true; - } - } - - return false; - } - - private static String findCharset(String paramaterPart) { - String charset = null; - - if (paramaterPart != null) { - StringTokenizer tokenizer = new StringTokenizer(paramaterPart, ";"); - while (tokenizer.hasMoreTokens()) { - String parameter = tokenizer.nextToken().trim(); - int eqIndex = parameter.indexOf('='); - if (eqIndex != -1) { - String name = parameter.substring(0, eqIndex); - if (CHARSET.equalsIgnoreCase(name.trim())) { - String value = unquote(parameter.substring(eqIndex + 1)); - - charset = value.toUpperCase(); - break; - } - } - } - } - - return charset; - } - - private static String unquote(String s) { - if (s.length() > 1 && (s.startsWith("\"") && s.endsWith("\""))) { - return s.substring(1, s.length() - 1); - } else { - return s; - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java deleted file mode 100644 index 4a2123d..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import java.nio.ByteBuffer; -import java.util.UUID; - -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.UnsignedLong; - -/** - * Helper class for identifying and converting message-id and correlation-id values between the - * AMQP types and the Strings values used by JMS. - *

- * AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, - * message-id-binary, message-id-uuid, or message-id-ulong. In order to accept or return a - * string representation of these for interoperability with other AMQP clients, the following - * encoding can be used after removing or before adding the "ID:" prefix used for a JMSMessageID - * value:
- *

- * {@literal "AMQP_BINARY:"}
- * {@literal "AMQP_UUID:"}
- * {@literal "AMQP_ULONG:"}
- * {@literal "AMQP_STRING:"}
- *

- * The AMQP_STRING encoding exists only for escaping message-id-string values that happen to - * begin with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used - * otherwise. - *

- * When provided a string for conversion which attempts to identify itself as an encoded binary, - * uuid, or ulong but can't be converted into the indicated format, an exception will be thrown. - */ -public class AMQPMessageIdHelper { - - public static final AMQPMessageIdHelper INSTANCE = new AMQPMessageIdHelper(); - - public static final String AMQP_STRING_PREFIX = "AMQP_STRING:"; - public static final String AMQP_UUID_PREFIX = "AMQP_UUID:"; - public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:"; - public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:"; - - private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length(); - private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length(); - private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length(); - private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length(); - private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray(); - - /** - * Takes the provided AMQP messageId style object, and convert it to a base string. Encodes - * type information as a prefix where necessary to convey or escape the type of the provided - * object. - * - * @param messageId - * the raw messageId object to process - * @return the base string to be used in creating the actual id. - */ - public String toBaseMessageIdString(Object messageId) { - if (messageId == null) { - return null; - } else if (messageId instanceof String) { - String stringId = (String) messageId; - - // If the given string has a type encoding prefix, - // we need to escape it as an encoded string (even if - // the existing encoding prefix was also for string) - if (hasTypeEncodingPrefix(stringId)) { - return AMQP_STRING_PREFIX + stringId; - } else { - return stringId; - } - } else if (messageId instanceof UUID) { - return AMQP_UUID_PREFIX + messageId.toString(); - } else if (messageId instanceof UnsignedLong) { - return AMQP_ULONG_PREFIX + messageId.toString(); - } else if (messageId instanceof Binary) { - ByteBuffer dup = ((Binary) messageId).asByteBuffer(); - - byte[] bytes = new byte[dup.remaining()]; - dup.get(bytes); - - String hex = convertBinaryToHexString(bytes); - - return AMQP_BINARY_PREFIX + hex; - } else { - throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass()); - } - } - - /** - * Takes the provided base id string and return the appropriate amqp messageId style object. - * Converts the type based on any relevant encoding information found as a prefix. - * - * @param baseId - * the object to be converted to an AMQP MessageId value. - * @return the AMQP messageId style object - * @throws ActiveMQAMQPIllegalStateException - * if the provided baseId String indicates an encoded type but can't be converted to - * that type. - */ - public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException { - if (baseId == null) { - return null; - } - - try { - if (hasAmqpUuidPrefix(baseId)) { - String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH); - return UUID.fromString(uuidString); - } else if (hasAmqpUlongPrefix(baseId)) { - String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH); - return UnsignedLong.valueOf(longString); - } else if (hasAmqpStringPrefix(baseId)) { - return strip(baseId, AMQP_STRING_PREFIX_LENGTH); - } else if (hasAmqpBinaryPrefix(baseId)) { - String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH); - byte[] bytes = convertHexStringToBinary(hexString); - return new Binary(bytes); - } else { - // We have a string without any type prefix, transmit it as-is. - return baseId; - } - } catch (IllegalArgumentException e) { - throw new ActiveMQAMQPIllegalStateException("Unable to convert ID value"); - } - } - - /** - * Convert the provided hex-string into a binary representation where each byte represents - * two characters of the hex string. - *

- * The hex characters may be upper or lower case. - * - * @param hexString - * string to convert to a binary value. - * @return a byte array containing the binary representation - * @throws IllegalArgumentException - * if the provided String is a non-even length or contains non-hex characters - */ - public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException { - int length = hexString.length(); - - // As each byte needs two characters in the hex encoding, the string must be an even - // length. - if (length % 2 != 0) { - throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString); - } - - byte[] binary = new byte[length / 2]; - - for (int i = 0; i < length; i += 2) { - char highBitsChar = hexString.charAt(i); - char lowBitsChar = hexString.charAt(i + 1); - - int highBits = hexCharToInt(highBitsChar, hexString) << 4; - int lowBits = hexCharToInt(lowBitsChar, hexString); - - binary[i / 2] = (byte) (highBits + lowBits); - } - - return binary; - } - - /** - * Convert the provided binary into a hex-string representation where each character - * represents 4 bits of the provided binary, i.e each byte requires two characters. - *

- * The returned hex characters are upper-case. - * - * @param bytes - * the binary value to convert to a hex String instance. - * @return a String containing a hex representation of the bytes - */ - public String convertBinaryToHexString(byte[] bytes) { - // Each byte is represented as 2 chars - StringBuilder builder = new StringBuilder(bytes.length * 2); - - for (byte b : bytes) { - // The byte will be expanded to int before shifting, replicating the - // sign bit, so mask everything beyond the first 4 bits afterwards - int highBitsInt = (b >> 4) & 0xF; - // We only want the first 4 bits - int lowBitsInt = b & 0xF; - - builder.append(HEX_CHARS[highBitsInt]); - builder.append(HEX_CHARS[lowBitsInt]); - } - - return builder.toString(); - } - - // ----- Internal implementation ------------------------------------------// - - private boolean hasTypeEncodingPrefix(String stringId) { - return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) || hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId); - } - - private boolean hasAmqpStringPrefix(String stringId) { - return stringId.startsWith(AMQP_STRING_PREFIX); - } - - private boolean hasAmqpUlongPrefix(String stringId) { - return stringId.startsWith(AMQP_ULONG_PREFIX); - } - - private boolean hasAmqpUuidPrefix(String stringId) { - return stringId.startsWith(AMQP_UUID_PREFIX); - } - - private boolean hasAmqpBinaryPrefix(String stringId) { - return stringId.startsWith(AMQP_BINARY_PREFIX); - } - - private String strip(String id, int numChars) { - return id.substring(numChars); - } - - private int hexCharToInt(char ch, String orig) throws IllegalArgumentException { - if (ch >= '0' && ch <= '9') { - // subtract '0' to get difference in position as an int - return ch - '0'; - } else if (ch >= 'A' && ch <= 'F') { - // subtract 'A' to get difference in position as an int - // and then add 10 for the offset of 'A' - return ch - 'A' + 10; - } else if (ch >= 'a' && ch <= 'f') { - // subtract 'a' to get difference in position as an int - // and then add 10 for the offset of 'a' - return ch - 'a' + 10; - } - - throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig); - } -}