Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 44893200C4F for ; Fri, 3 Mar 2017 02:04:59 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4339A160B6F; Fri, 3 Mar 2017 01:04:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 95F42160B8C for ; Fri, 3 Mar 2017 02:04:56 +0100 (CET) Received: (qmail 65913 invoked by uid 500); 3 Mar 2017 01:04:55 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 65380 invoked by uid 99); 3 Mar 2017 01:04:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Mar 2017 01:04:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 20505DFF6D; Fri, 3 Mar 2017 01:04:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Fri, 03 Mar 2017 01:05:09 -0000 Message-Id: In-Reply-To: <4e2b7f0e739940c0ab60fca86a2e4d4d@git.apache.org> References: <4e2b7f0e739940c0ab60fca86a2e4d4d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/36] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding. archived-at: Fri, 03 Mar 2017 01:04:59 -0000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java index 6aa44a4..125a20f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java @@ -23,7 +23,6 @@ import java.io.IOException; import javax.jms.BytesMessage; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport; @@ -33,6 +32,7 @@ import org.apache.activemq.artemis.protocol.amqp.converter.message.InboundTransf import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingInboundTransformer; import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingOutboundTransformer; import org.apache.activemq.artemis.protocol.amqp.converter.message.OutboundTransformer; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.utils.IDGenerator; @@ -52,8 +52,8 @@ public class ProtonMessageConverter implements MessageConverter { private final OutboundTransformer outboundTransformer; @Override - public ServerMessage inbound(Object messageSource) throws Exception { - EncodedMessage encodedMessageSource = (EncodedMessage) messageSource; + public org.apache.activemq.artemis.api.core.Message inbound(Object messageSource) throws Exception { + AMQPMessage encodedMessageSource = (AMQPMessage) messageSource; ServerJMSMessage transformedMessage = null; try { @@ -67,11 +67,11 @@ public class ProtonMessageConverter implements MessageConverter { transformedMessage.encode(); - return (ServerMessage) transformedMessage.getInnerMessage(); + return transformedMessage.getInnerMessage(); } @Override - public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception { + public Object outbound(org.apache.activemq.artemis.api.core.Message messageOutbound, int deliveryCount) throws Exception { // Useful for testing but not recommended for real life use. ByteBuf nettyBuffer = Unpooled.buffer(1024); NettyWritable buffer = new NettyWritable(nettyBuffer); @@ -83,7 +83,7 @@ public class ProtonMessageConverter implements MessageConverter { return encoded; } - public Object outbound(ServerMessage messageOutbound, int deliveryCount, WritableBuffer buffer) throws Exception { + public Object outbound(org.apache.activemq.artemis.api.core.Message messageOutbound, int deliveryCount, WritableBuffer buffer) throws Exception { ServerJMSMessage jmsMessage = AMQPMessageSupport.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount); jmsMessage.decode(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java index abdf808..c3a60f0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java @@ -19,8 +19,8 @@ package org.apache.activemq.artemis.protocol.amqp.converter.jms; import javax.jms.BytesMessage; import javax.jms.JMSException; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean; @@ -49,13 +49,13 @@ import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteUTF; public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMessage { - public ServerJMSBytesMessage(MessageInternal message, int deliveryCount) { + public ServerJMSBytesMessage(Message message, int deliveryCount) { super(message, deliveryCount); } @Override public long getBodyLength() throws JMSException { - return message.getEndOfBodyPosition() - MessageImpl.BODY_OFFSET; + return message.getEndOfBodyPosition() - CoreMessage.BODY_OFFSET; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java index 0268065..df79183 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java @@ -27,7 +27,6 @@ import java.util.Set; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.utils.TypedProperties; import static org.apache.activemq.artemis.reader.MapMessageUtil.readBodyMap; @@ -52,7 +51,7 @@ public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMe /* * This constructor is used to construct messages prior to sending */ - public ServerJMSMapMessage(MessageInternal message, int deliveryCount) { + public ServerJMSMapMessage(Message message, int deliveryCount) { super(message, deliveryCount); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java index f9a94f5..adf4621 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java @@ -16,18 +16,17 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.jms; -import java.util.Collections; -import java.util.Enumeration; - import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; +import java.util.Collections; +import java.util.Enumeration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.reader.MessageUtil; @@ -35,16 +34,16 @@ import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAG public class ServerJMSMessage implements Message { - protected final MessageInternal message; + protected final CoreMessage message; protected int deliveryCount; - public MessageInternal getInnerMessage() { + public org.apache.activemq.artemis.api.core.Message getInnerMessage() { return message; } - public ServerJMSMessage(MessageInternal message, int deliveryCount) { - this.message = message; + public ServerJMSMessage(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) { + this.message = (CoreMessage)message; this.deliveryCount = deliveryCount; } @@ -60,7 +59,7 @@ public class ServerJMSMessage implements Message { protected ActiveMQBuffer getReadBodyBuffer() { if (readBodyBuffer == null) { // to avoid clashes between multiple threads - readBodyBuffer = message.getBodyBufferDuplicate(); + readBodyBuffer = message.getReadOnlyBodyBuffer(); } return readBodyBuffer; } @@ -140,7 +139,7 @@ public class ServerJMSMessage implements Message { @Override public final Destination getJMSDestination() throws JMSException { - SimpleString sdest = message.getAddress(); + SimpleString sdest = message.getAddressSimpleString(); if (sdest == null) { return null; @@ -152,7 +151,7 @@ public class ServerJMSMessage implements Message { @Override public final void setJMSDestination(Destination destination) throws JMSException { if (destination == null) { - message.setAddress(null); + message.setAddress((SimpleString)null); } else { message.setAddress(((ActiveMQDestination) destination).getSimpleAddress()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java index d1eaac6..15b04a9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java @@ -22,7 +22,6 @@ import javax.jms.JMSException; import javax.jms.ObjectMessage; import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.qpid.proton.amqp.Binary; public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMessage { @@ -31,7 +30,7 @@ public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMe private Binary payload; - public ServerJMSObjectMessage(MessageInternal message, int deliveryCount) { + public ServerJMSObjectMessage(Message message, int deliveryCount) { super(message, deliveryCount); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java index a53fc0e..b092e61 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java @@ -23,7 +23,6 @@ import javax.jms.StreamMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.utils.DataConstants; import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBoolean; @@ -44,7 +43,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St private int bodyLength = 0; - public ServerJMSStreamMessage(MessageInternal message, int deliveryCount) { + public ServerJMSStreamMessage(Message message, int deliveryCount) { super(message, deliveryCount); } @@ -180,7 +179,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St @Override public Object readObject() throws JMSException { - if (getReadBodyBuffer().readerIndex() >= message.getEndOfBodyPosition()) { + if (getReadBodyBuffer().readerIndex() >= getReadBodyBuffer().writerIndex()) { throw new MessageEOFException(""); } try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java index eb88de0..058a3e9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java @@ -21,7 +21,6 @@ import javax.jms.TextMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import static org.apache.activemq.artemis.reader.TextMessageUtil.readBodyText; import static org.apache.activemq.artemis.reader.TextMessageUtil.writeBodyText; @@ -49,7 +48,7 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag /* * This constructor is used to construct messages prior to sending */ - public ServerJMSTextMessage(MessageInternal message, int deliveryCount) { + public ServerJMSTextMessage(Message message, int deliveryCount) { super(message, deliveryCount); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java index 8c4612d..0a39573 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java @@ -16,24 +16,15 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE; -import static org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE; -import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE; -import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE; -import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE; -import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; - +import javax.jms.Destination; +import javax.jms.JMSException; import java.nio.charset.Charset; import java.util.Arrays; import java.util.Map; import java.util.Set; -import javax.jms.Destination; -import javax.jms.JMSException; - import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; @@ -48,6 +39,13 @@ import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.message.Message; +import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE; +import static org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE; +import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE; +import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE; +import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE; +import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; + /** * Support class containing constant values and static methods that are used to map to / from * AMQP Message types being sent or received. @@ -181,7 +179,7 @@ public final class AMQPMessageSupport { } } - public static ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) { + public static ServerJMSMessage wrapMessage(int messageType, org.apache.activemq.artemis.api.core.Message wrapped, int deliveryCount) { switch (messageType) { case STREAM_TYPE: return new ServerJMSStreamMessage(wrapped, deliveryCount); @@ -267,8 +265,8 @@ public final class AMQPMessageSupport { return message; } - private static ServerMessageImpl newMessage(IDGenerator idGenerator, byte messageType) { - ServerMessageImpl message = new ServerMessageImpl(idGenerator.generateID(), 512); + private static CoreMessage newMessage(IDGenerator idGenerator, byte messageType) { + CoreMessage message = new CoreMessage(idGenerator.generateID(), 512); message.setType(messageType); ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); return message; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java deleted file mode 100644 index 7028547..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.utils.IDGenerator; - -public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer { - - public AMQPNativeInboundTransformer(IDGenerator idGenerator) { - super(idGenerator); - } - - @Override - public String getTransformerName() { - return TRANSFORMER_NATIVE; - } - - @Override - public InboundTransformer getFallbackTransformer() { - return new AMQPRawInboundTransformer(idGenerator); - } - - @Override - public ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception { - org.apache.qpid.proton.message.Message amqp = amqpMessage.decode(); - - return populateMessage(super.transform(amqpMessage), amqp); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java deleted file mode 100644 index 445eaca..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.message; - -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage; - -import javax.jms.DeliveryMode; -import javax.jms.Message; - -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.utils.IDGenerator; - -public class AMQPRawInboundTransformer extends InboundTransformer { - - public AMQPRawInboundTransformer(IDGenerator idGenerator) { - super(idGenerator); - } - - @Override - public String getTransformerName() { - return TRANSFORMER_RAW; - } - - @Override - public InboundTransformer getFallbackTransformer() { - return null; // No fallback from full raw transform - } - - @Override - public ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception { - ServerJMSBytesMessage message = createBytesMessage(idGenerator); - message.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength()); - - // We cannot decode the message headers to check so err on the side of caution - // and mark all messages as persistent. - message.setJMSDeliveryMode(DeliveryMode.PERSISTENT); - message.setJMSPriority(Message.DEFAULT_PRIORITY); - message.setJMSTimestamp(System.currentTimeMillis()); - - message.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat()); - message.setBooleanProperty(JMS_AMQP_NATIVE, true); - - return message; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java index 1316ab7..cec34ef 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java @@ -37,6 +37,7 @@ import javax.jms.Message; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Decimal128; @@ -65,12 +66,10 @@ public abstract class InboundTransformer { this.idGenerator = idGenerator; } - public abstract ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception; + public abstract ServerJMSMessage transform(AMQPMessage amqpMessage) throws Exception; public abstract String getTransformerName(); - public abstract InboundTransformer getFallbackTransformer(); - @SuppressWarnings("unchecked") protected ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception { Header header = amqp.getHeader(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java index 629c499..4c7426e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java @@ -48,6 +48,7 @@ import java.util.Map; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; @@ -67,36 +68,21 @@ public class JMSMappingInboundTransformer extends InboundTransformer { return TRANSFORMER_JMS; } - @Override - public InboundTransformer getFallbackTransformer() { - return new AMQPNativeInboundTransformer(idGenerator); + + public ServerJMSMessage transform(EncodedMessage message) throws Exception { + AMQPMessage messageEncode = new AMQPMessage(message.getMessageFormat(), message.getArray(), null); + return transform(messageEncode); } @Override - public ServerJMSMessage transform(EncodedMessage encodedMessage) throws Exception { - ServerJMSMessage transformedMessage = null; - - try { - Message amqpMessage = encodedMessage.decode(); - transformedMessage = createServerMessage(amqpMessage); - populateMessage(transformedMessage, amqpMessage); - } catch (Exception ex) { - InboundTransformer transformer = this.getFallbackTransformer(); - - while (transformer != null) { - try { - transformedMessage = transformer.transform(encodedMessage); - break; - } catch (Exception e) { - transformer = transformer.getFallbackTransformer(); - } - } - } + public ServerJMSMessage transform(AMQPMessage amqpMessage) throws Exception { + ServerJMSMessage transformedMessage = createServerMessage(amqpMessage.getProtonMessage()); + populateMessage(transformedMessage, amqpMessage.getProtonMessage()); // Regardless of the transformer that finally decoded the message we need to ensure that // the AMQP Message Format value is preserved for application on retransmit. - if (transformedMessage != null && encodedMessage.getMessageFormat() != 0) { - transformedMessage.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, encodedMessage.getMessageFormat()); + if (transformedMessage != null && amqpMessage.getMessageFormat() != 0) { + transformedMessage.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat()); } return transformedMessage; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java index 7dbc6d4..2ef3122 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java @@ -64,7 +64,6 @@ import javax.jms.TemporaryTopic; import javax.jms.TextMessage; import javax.jms.Topic; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; @@ -510,7 +509,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { // will be unknown so we check for special cases of messages with special data // encoded into the server message body. if (orignalEncoding == AMQP_UNKNOWN) { - MessageInternal internalMessage = message.getInnerMessage(); + org.apache.activemq.artemis.api.core.Message internalMessage = message.getInnerMessage(); int readerIndex = internalMessage.getBodyBuffer().readerIndex(); try { Object s = internalMessage.getBodyBuffer().readNullableSimpleString(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 6462315..bac3e7e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -134,6 +134,10 @@ public class AMQPConnectionContext extends ProtonInitializable { handler.flush(); } + public void flush(boolean wait) { + handler.flush(wait); + } + public void close(ErrorCondition errorCondition) { handler.close(errorCondition); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 8341de7..34c2c07 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; @@ -25,7 +23,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; -import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; @@ -144,26 +141,25 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements if (delivery.isPartial()) { return; } + byte[] data = new byte[delivery.getDataLength()]; + Transaction tx = null; - ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024); - try { - synchronized (connection.getLock()) { - DeliveryUtil.readDelivery(receiver, buffer); + synchronized (connection.getLock()) { + receiver.recv(data, 0, data.length); - receiver.advance(); + receiver.advance(); + } - Transaction tx = null; - if (delivery.getRemoteState() instanceof TransactionalState) { + if (delivery.getRemoteState() instanceof TransactionalState) { - TransactionalState txState = (TransactionalState) delivery.getRemoteState(); - tx = this.sessionSPI.getTransaction(txState.getTxnId()); - } - sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), buffer); + TransactionalState txState = (TransactionalState) delivery.getRemoteState(); + tx = this.sessionSPI.getTransaction(txState.getTxnId()); + } - flow(maxCreditAllocation, minCreditRefresh); - } - } finally { - buffer.release(); + sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data); + + synchronized (connection.getLock()) { + flow(maxCreditAllocation, minCreditRefresh); } } catch (Exception e) { log.warn(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 2e19f07..15611c3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -23,6 +23,7 @@ import java.util.Set; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress; import org.apache.activemq.artemis.core.server.AddressQueryResult; @@ -37,6 +38,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternal import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.selector.filter.FilterException; @@ -474,7 +476,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (closed) { return; } - Object message = delivery.getContext(); + Message message = (Message)delivery.getContext(); boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; @@ -566,7 +568,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr /** * handle an out going message from ActiveMQ Artemis, send via the Proton Sender */ - public int deliverMessage(Object message, int deliveryCount) throws Exception { + public int deliverMessage(Message message, int deliveryCount) throws Exception { if (closed) { return 0; } @@ -592,13 +594,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr try { long messageFormat = 0; - // Encode the Server Message into the given Netty Buffer as an AMQP - // Message transformed from the internal message model. - try { - messageFormat = sessionSPI.encodeMessage(message, deliveryCount, new NettyWritable(nettyBuffer)); - } catch (Throwable e) { - log.warn(e.getMessage(), e); - throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); + + if (message instanceof AMQPMessage) { + message.sendBuffer(nettyBuffer, deliveryCount); + } else { + // Encode the Server Message into the given Netty Buffer as an AMQP + // Message transformed from the internal message model. + try { + messageFormat = sessionSPI.encodeMessage(message, deliveryCount, new NettyWritable(nettyBuffer)); + } catch (Throwable e) { + log.warn(e.getMessage(), e); + throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); + } } int size = nettyBuffer.writerIndex(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java index 51f42a3..1afeba8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; @@ -61,7 +62,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { return; } - DeliveryUtil.readDelivery(receiver, buffer); + receiver.recv(new NettyWritable(buffer)); receiver.advance(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index 25ef51e..673a688 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -254,7 +254,7 @@ public class ProtonHandler extends ProtonInitializable { flush(false); } - private void flush(boolean wait) { + public void flush(boolean wait) { synchronized (lock) { transport.process(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java index 9257c6b..0ff1d3b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java @@ -17,24 +17,11 @@ package org.apache.activemq.artemis.protocol.amqp.util; import io.netty.buffer.ByteBuf; -import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; public class DeliveryUtil { - public static int readDelivery(Receiver receiver, ByteBuf buffer) { - int initial = buffer.writerIndex(); - // optimization by norman - int count; - while ((count = receiver.recv(buffer.array(), buffer.arrayOffset() + buffer.writerIndex(), buffer.writableBytes())) > 0) { - // Increment the writer index by the number of bytes written into it while calling recv. - buffer.writerIndex(buffer.writerIndex() + count); - buffer.ensureWritable(count); - } - return buffer.writerIndex() - initial; - } - public static MessageImpl decodeMessageImpl(ByteBuf buffer) { MessageImpl message = (MessageImpl) Message.Factory.create(); message.decode(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java new file mode 100644 index 0000000..e0705b4 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.util; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +import io.netty.buffer.ByteBuf; +import org.apache.qpid.proton.codec.ReadableBuffer; + +public class NettyReadable implements ReadableBuffer { + + private static final Charset Charset_UTF8 = Charset.forName("UTF-8"); + + private final ByteBuf buffer; + + public NettyReadable(ByteBuf buffer) { + this.buffer = buffer; + } + + @Override + public void put(ReadableBuffer other) { + buffer.writeBytes(other.byteBuffer()); + } + + @Override + public byte get() { + return buffer.readByte(); + } + + @Override + public int getInt() { + return buffer.readInt(); + } + + @Override + public long getLong() { + return buffer.readLong(); + } + + @Override + public short getShort() { + return buffer.readShort(); + } + + @Override + public float getFloat() { + return buffer.readFloat(); + } + + @Override + public double getDouble() { + return buffer.readDouble(); + } + + @Override + public ReadableBuffer get(byte[] data, int offset, int length) { + buffer.readBytes(data, offset, length); + return this; + } + + @Override + public ReadableBuffer get(byte[] data) { + buffer.readBytes(data); + return this; + } + + @Override + public ReadableBuffer position(int position) { + buffer.readerIndex(position); + return this; + } + + @Override + public ReadableBuffer slice() { + return new NettyReadable(buffer.slice()); + } + + @Override + public ReadableBuffer flip() { + return new NettyReadable(buffer.duplicate().setIndex(0, buffer.readerIndex())); + } + + @Override + public ReadableBuffer limit(int limit) { + buffer.writerIndex(limit); + return this; + } + + @Override + public int limit() { + return buffer.writerIndex(); + } + + @Override + public int remaining() { + return buffer.readableBytes(); + } + + @Override + public int position() { + return buffer.readerIndex(); + } + + @Override + public boolean hasRemaining() { + return buffer.readableBytes() > 0; + } + + @Override + public ReadableBuffer duplicate() { + return new NettyReadable(buffer.duplicate()); + } + + @Override + public ByteBuffer byteBuffer() { + return buffer.nioBuffer(0, buffer.writerIndex()); + } + + @Override + public String readUTF8() { + return buffer.toString(Charset_UTF8); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java index 08c46be..9a333c7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java @@ -16,12 +16,6 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter; -import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE; -import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE; -import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE; -import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.wrapMessage; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; @@ -29,10 +23,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; @@ -52,8 +47,11 @@ import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.Assert; import org.junit.Test; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; +import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE; +import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE; +import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE; +import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.wrapMessage; public class TestConversions extends Assert { @@ -72,10 +70,10 @@ public class TestConversions extends Assert { message.setBody(new AmqpValue(new Boolean(true))); - EncodedMessage encodedMessage = encodeMessage(message); + AMQPMessage encodedMessage = new AMQPMessage(message, null); ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerMessage serverMessage = converter.inbound(encodedMessage); + org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage); verifyProperties(new ServerJMSMessage(serverMessage, 0)); @@ -101,10 +99,10 @@ public class TestConversions extends Assert { message.setBody(new Data(new Binary(bodyBytes))); - EncodedMessage encodedMessage = encodeMessage(message); + AMQPMessage encodedMessage = new AMQPMessage(message, null); ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerMessage serverMessage = converter.inbound(encodedMessage); + org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage); ServerJMSBytesMessage bytesMessage = (ServerJMSBytesMessage) wrapMessage(BYTES_TYPE, serverMessage, 0); @@ -151,10 +149,10 @@ public class TestConversions extends Assert { message.setBody(new AmqpValue(mapValues)); - EncodedMessage encodedMessage = encodeMessage(message); + AMQPMessage encodedMessage = new AMQPMessage(message, null); ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerMessage serverMessage = converter.inbound(encodedMessage); + org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage); ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) wrapMessage(MAP_TYPE, serverMessage, 0); mapMessage.decode(); @@ -188,12 +186,10 @@ public class TestConversions extends Assert { message.setBody(new AmqpSequence(objects)); - EncodedMessage encodedMessage = encodeMessage(message); + AMQPMessage encodedMessage = new AMQPMessage(message, null); ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerMessage serverMessage = converter.inbound(encodedMessage); - - simulatePersistence(serverMessage); + org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage); ServerJMSStreamMessage streamMessage = (ServerJMSStreamMessage) wrapMessage(STREAM_TYPE, serverMessage, 0); @@ -222,12 +218,10 @@ public class TestConversions extends Assert { String text = "someText"; message.setBody(new AmqpValue(text)); - EncodedMessage encodedMessage = encodeMessage(message); + AMQPMessage encodedMessage = new AMQPMessage(message, null); ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerMessage serverMessage = converter.inbound(encodedMessage); - - simulatePersistence(serverMessage); + org.apache.activemq.artemis.api.core.Message serverMessage = converter.inbound(encodedMessage); ServerJMSTextMessage textMessage = (ServerJMSTextMessage) wrapMessage(TEXT_TYPE, serverMessage, 0); textMessage.decode(); @@ -247,13 +241,6 @@ public class TestConversions extends Assert { System.out.println("output = " + amqpMessage); } - private void simulatePersistence(ServerMessage serverMessage) { - serverMessage.setAddress(new SimpleString("SomeAddress")); - // This is just to simulate what would happen during the persistence of the message - // We need to still be able to recover the message when we read it back - ((EncodingSupport) serverMessage).encode(new EmptyBuffer()); - } - private ProtonJMessage reEncodeMsg(Object obj) { ProtonJMessage objOut = (ProtonJMessage) obj; @@ -263,14 +250,6 @@ public class TestConversions extends Assert { return objOut; } - private EncodedMessage encodeMessage(MessageImpl message) { - ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024 * 1024); - message.encode(new NettyWritable(buf)); - byte[] bytesConvert = new byte[buf.writerIndex()]; - buf.readBytes(bytesConvert); - return new EncodedMessage(0, bytesConvert, 0, bytesConvert.length); - } - class EmptyBuffer implements ActiveMQBuffer { @Override @@ -770,5 +749,10 @@ public class TestConversions extends Assert { public void release() { //no-op } + + @Override + public void writeBytes(ByteBuf src, int srcIndex, int length) { + + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java index d7a948a..bfaec54 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java @@ -44,6 +44,7 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessa import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.qpid.proton.Proton; @@ -82,8 +83,9 @@ public class JMSMappingInboundTransformerTest { Message message = Message.Factory.create(); message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE); - EncodedMessage em = encodeMessage(message); - javax.jms.Message jmsMessage = transformer.transform(em); + AMQPMessage messageEncode = new AMQPMessage(message, null); + + javax.jms.Message jmsMessage = transformer.transform(messageEncode); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java index 2ece01d..3fe6d70 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java @@ -16,19 +16,7 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - +import javax.jms.JMSException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; @@ -39,10 +27,10 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import javax.jms.JMSException; - +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; @@ -64,8 +52,18 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class JMSMappingOutboundTransformerTest { @@ -943,8 +941,8 @@ public class JMSMappingOutboundTransformerTest { } } - private ServerMessageImpl newMessage(byte messageType) { - ServerMessageImpl message = new ServerMessageImpl(idGenerator.generateID(), 512); + private CoreMessage newMessage(byte messageType) { + CoreMessage message = new CoreMessage(idGenerator.generateID(), 512); message.setType(messageType); ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); return message; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java index 99aab33..fdf0129 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java @@ -21,8 +21,8 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; @@ -68,11 +68,11 @@ public class JMSTransformationSpeedComparisonTest { Message message = Proton.message(); message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); - EncodedMessage encoded = encode(message); + AMQPMessage encoded = encode(message); // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); + org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded); encode(converter.outbound(intermediate, 1)); } @@ -80,7 +80,7 @@ public class JMSTransformationSpeedComparisonTest { long startTime = System.nanoTime(); for (int i = 0; i < PROFILE_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); + org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded); encode(converter.outbound(intermediate, 1)); } totalDuration += System.nanoTime() - startTime; @@ -99,11 +99,11 @@ public class JMSTransformationSpeedComparisonTest { message.setContentType("text/plain"); message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); - EncodedMessage encoded = encode(message); + AMQPMessage encoded = encode(message); // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); + org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded); encode(converter.outbound(intermediate, 1)); } @@ -111,7 +111,7 @@ public class JMSTransformationSpeedComparisonTest { long startTime = System.nanoTime(); for (int i = 0; i < PROFILE_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); + org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded); encode(converter.outbound(intermediate, 1)); } totalDuration += System.nanoTime() - startTime; @@ -122,11 +122,11 @@ public class JMSTransformationSpeedComparisonTest { @Test public void testTypicalQpidJMSMessage() throws Exception { - EncodedMessage encoded = encode(createTypicalQpidJMSMessage()); + AMQPMessage encoded = encode(createTypicalQpidJMSMessage()); // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); + org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded); encode(converter.outbound(intermediate, 1)); } @@ -134,7 +134,7 @@ public class JMSTransformationSpeedComparisonTest { long startTime = System.nanoTime(); for (int i = 0; i < PROFILE_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); + org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded); encode(converter.outbound(intermediate, 1)); } totalDuration += System.nanoTime() - startTime; @@ -145,11 +145,11 @@ public class JMSTransformationSpeedComparisonTest { @Test public void testComplexQpidJMSMessage() throws Exception { - EncodedMessage encoded = encode(createComplexQpidJMSMessage()); + AMQPMessage encoded = encode(createComplexQpidJMSMessage()); // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); + org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded); encode(converter.outbound(intermediate, 1)); } @@ -157,7 +157,7 @@ public class JMSTransformationSpeedComparisonTest { long startTime = System.nanoTime(); for (int i = 0; i < PROFILE_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); + org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded); encode(converter.outbound(intermediate, 1)); } totalDuration += System.nanoTime() - startTime; @@ -168,7 +168,7 @@ public class JMSTransformationSpeedComparisonTest { @Test public void testTypicalQpidJMSMessageInBoundOnly() throws Exception { - EncodedMessage encoded = encode(createTypicalQpidJMSMessage()); + AMQPMessage encoded = encode(createTypicalQpidJMSMessage()); // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { @@ -190,8 +190,8 @@ public class JMSTransformationSpeedComparisonTest { @Test public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception { - EncodedMessage encoded = encode(createTypicalQpidJMSMessage()); - ServerMessage intermediate = converter.inbound(encoded); + AMQPMessage encoded = encode(createTypicalQpidJMSMessage()); + org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded); // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { @@ -278,14 +278,14 @@ public class JMSTransformationSpeedComparisonTest { return message; } - private EncodedMessage encode(Object target) { + private AMQPMessage encode(Object target) { if (target instanceof ProtonJMessage) { ProtonJMessage amqp = (ProtonJMessage) target; ByteBuf nettyBuffer = Unpooled.buffer(1024); amqp.encode(new NettyWritable(nettyBuffer)); - return new EncodedMessage(0, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes()); + return new AMQPMessage(0, nettyBuffer.array(), null); } else { return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java index a5a2168..6a0f20c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java @@ -26,8 +26,8 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.qpid.proton.Proton; @@ -87,7 +87,7 @@ public class MessageTransformationTest { EncodedMessage encoded = encode(incomingMessage); - ServerMessage outbound = converter.inbound(encoded); + org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(encoded); Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, outbound.getLongProperty("JMSXDeliveryCount").intValue())).decode(); // Test that message details are equal @@ -128,8 +128,7 @@ public class MessageTransformationTest { incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); - EncodedMessage encoded = encode(incomingMessage); - ServerMessage outbound = converter.inbound(encoded); + org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(incomingMessage, null)); Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); assertNull(outboudMessage.getHeader()); @@ -144,8 +143,7 @@ public class MessageTransformationTest { incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); incomingMessage.setMessageId("ID:SomeQualifier:0:0:1"); - EncodedMessage encoded = encode(incomingMessage); - ServerMessage outbound = converter.inbound(encoded); + org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(incomingMessage, null)); Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); assertNull(outboudMessage.getHeader()); @@ -160,8 +158,7 @@ public class MessageTransformationTest { incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); incomingMessage.setDurable(true); - EncodedMessage encoded = encode(incomingMessage); - ServerMessage outbound = converter.inbound(encoded); + org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(incomingMessage, null)); Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); assertNotNull(outboudMessage.getHeader()); @@ -175,8 +172,7 @@ public class MessageTransformationTest { incomingMessage.setBody(new AmqpValue(new Boolean(true))); - EncodedMessage encoded = encode(incomingMessage); - ServerMessage outbound = converter.inbound(encoded); + org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(incomingMessage, null)); Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); Section section = outboudMessage.getBody(); @@ -233,8 +229,7 @@ public class MessageTransformationTest { message.setMessageAnnotations(new MessageAnnotations(messageAnnotations)); message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); - EncodedMessage encoded = encode(message); - ServerMessage outbound = converter.inbound(encoded); + org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(message, null)); Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); assertNotNull(outboudMessage.getHeader()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java new file mode 100644 index 0000000..4313eae --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.message; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; +import org.apache.commons.collections.map.HashedMap; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message.impl.MessageImpl; +import org.junit.Assert; +import org.junit.Test; + +public class AMQPMessageTest { + + @Test + public void testVerySimple() { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + protonMessage.setHeader( new Header()); + Properties properties = new Properties(); + properties.setTo("someNiceLocal"); + protonMessage.setProperties(properties); + protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7)); + protonMessage.getHeader().setDurable(Boolean.TRUE); + protonMessage.setApplicationProperties(new ApplicationProperties(new HashedMap())); + + ByteBuf nettyBuffer = Unpooled.buffer(1500); + + protonMessage.encode(new NettyWritable(nettyBuffer)); + + byte[] bytes = new byte[nettyBuffer.writerIndex()]; + + nettyBuffer.readBytes(bytes); + + AMQPMessage encode = new AMQPMessage(0, bytes, null); + + Assert.assertEquals(7, encode.getHeader().getDeliveryCount().intValue()); + Assert.assertEquals(true, encode.getHeader().getDurable()); + Assert.assertEquals("someNiceLocal", encode.getAddress()); + + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index f0385dc..e619eb9 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -28,10 +28,9 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.transaction.Transaction; /** @@ -112,19 +111,20 @@ public class MQTTPublishManager { * to original ID and consumer in the Session state. This way we can look up the consumer Id and the message Id from * the PubAck or PubRec message id. * */ - protected void sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) throws Exception { + protected void sendMessage(CoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception { // This is to allow retries of PubRel. if (isManagementConsumer(consumer)) { sendPubRelMessage(message); } else { int qos = decideQoS(message, consumer); if (qos == 0) { - sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos); + // TODO-now: fix encoding + sendServerMessage((int) message.getMessageID(), message, deliveryCount, qos); session.getServerSession().acknowledge(consumer.getID(), message.getMessageID()); } else if (qos == 1 || qos == 2) { int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID()); outboundStore.publish(mqttid, message.getMessageID(), consumer.getID()); - sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos); + sendServerMessage(mqttid, message, deliveryCount, qos); } else { // Client must have disconnected and it's Subscription QoS cleared consumer.individualCancel(message.getMessageID(), false); @@ -149,7 +149,7 @@ public class MQTTPublishManager { */ void sendInternal(int messageId, String topic, int qos, ByteBuf payload, boolean retain, boolean internal) throws Exception { synchronized (lock) { - ServerMessage serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, retain, qos, payload); + Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, retain, qos, payload); if (qos > 0) { serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES); @@ -181,7 +181,7 @@ public class MQTTPublishManager { } } - void sendPubRelMessage(ServerMessage message) { + void sendPubRelMessage(Message message) { int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY); session.getProtocolHandler().sendPubRel(messageId); } @@ -190,7 +190,7 @@ public class MQTTPublishManager { try { Pair ref = outboundStore.publishReceived(messageId); if (ref != null) { - ServerMessage m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId); + Message m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId); session.getServerSession().send(m, true); session.getServerSession().acknowledge(ref.getB(), ref.getA()); } else { @@ -246,7 +246,7 @@ public class MQTTPublishManager { } } - private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) { + private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) { String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration()); ByteBuf payload; @@ -262,14 +262,14 @@ public class MQTTPublishManager { log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage()); } default: - ActiveMQBuffer bufferDup = message.getBodyBufferDuplicate(); + ActiveMQBuffer bufferDup = message.getReadOnlyBodyBuffer(); payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf(); break; } session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount); } - private int decideQoS(ServerMessage message, ServerConsumer consumer) { + private int decideQoS(Message message, ServerConsumer consumer) { int subscriptionQoS = -1; try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java index 596670b..0b52a0b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java @@ -17,12 +17,12 @@ package org.apache.activemq.artemis.core.protocol.mqtt; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.LinkedListIterator; @@ -44,7 +44,7 @@ public class MQTTRetainMessageManager { * the subscription queue for the consumer. When a new retained message is received the message will be sent to * the retained queue and the previous retain message consumed to remove it from the queue. */ - void handleRetainedMessage(ServerMessage message, String address, boolean reset, Transaction tx) throws Exception { + void handleRetainedMessage(Message message, String address, boolean reset, Transaction tx) throws Exception { SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration())); Queue queue = session.getServer().locateQueue(retainAddress); @@ -82,7 +82,7 @@ public class MQTTRetainMessageManager { Queue retainedQueue = session.getServer().locateQueue(retainedQueueName); try (LinkedListIterator i = retainedQueue.iterator()) { if (i.hasNext()) { - ServerMessage message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID()); + Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID()); sendToQueue(message, queue, tx); } } @@ -95,7 +95,7 @@ public class MQTTRetainMessageManager { tx.commit(); } - private void sendToQueue(ServerMessage message, Queue queue, Transaction tx) throws Exception { + private void sendToQueue(Message message, Queue queue, Transaction tx) throws Exception { RoutingContext context = new RoutingContextImpl(tx); queue.route(message, context); session.getServer().getPostOffice().processRoute(message, context, false);