Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 48BBF200C2E for ; Sun, 5 Mar 2017 17:50:06 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 45873160B7D; Sun, 5 Mar 2017 16:50:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 57775160B8D for ; Sun, 5 Mar 2017 17:50:04 +0100 (CET) Received: (qmail 7508 invoked by uid 500); 5 Mar 2017 16:50:03 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 6594 invoked by uid 99); 5 Mar 2017 16:50:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Mar 2017 16:50:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C1ECADFEE9; Sun, 5 Mar 2017 16:50:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Sun, 05 Mar 2017 16:50:10 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/17] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding. archived-at: Sun, 05 Mar 2017 16:50:06 -0000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 2e19f07..4e33c9b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -21,15 +21,19 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress; import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.QueueQueryResult; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; @@ -38,7 +42,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore; -import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; import org.apache.qpid.proton.amqp.DescribedType; @@ -61,9 +64,6 @@ import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Sender; import org.jboss.logging.Logger; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; - /** * TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links */ @@ -474,7 +474,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (closed) { return; } - Object message = delivery.getContext(); + Message message = (Message)delivery.getContext(); boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; @@ -518,6 +518,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr try { sessionSPI.ack(null, brokerConsumer, message); } catch (Exception e) { + e.printStackTrace(); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); } } else if (remoteState instanceof Released) { @@ -566,7 +567,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr /** * handle an out going message from ActiveMQ Artemis, send via the Proton Sender */ - public int deliverMessage(Object message, int deliveryCount) throws Exception { + public int deliverMessage(AMQPMessage message, int deliveryCount) throws Exception { if (closed) { return 0; } @@ -590,16 +591,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); try { - long messageFormat = 0; - - // Encode the Server Message into the given Netty Buffer as an AMQP - // Message transformed from the internal message model. - try { - messageFormat = sessionSPI.encodeMessage(message, deliveryCount, new NettyWritable(nettyBuffer)); - } catch (Throwable e) { - log.warn(e.getMessage(), e); - throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); - } + message.sendBuffer(nettyBuffer, deliveryCount); int size = nettyBuffer.writerIndex(); @@ -609,7 +601,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } final Delivery delivery; delivery = sender.delivery(tag, 0, tag.length); - delivery.setMessageFormat((int) messageFormat); + delivery.setMessageFormat((int) message.getMessageFormat()); delivery.setContext(message); // this will avoid a copy.. patch provided by Norman using buffer.array() http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java index 51f42a3..1afeba8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; @@ -61,7 +62,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { return; } - DeliveryUtil.readDelivery(receiver, buffer); + receiver.recv(new NettyWritable(buffer)); receiver.advance(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index 25ef51e..673a688 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -254,7 +254,7 @@ public class ProtonHandler extends ProtonInitializable { flush(false); } - private void flush(boolean wait) { + public void flush(boolean wait) { synchronized (lock) { transport.process(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java new file mode 100644 index 0000000..e0705b4 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.util; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +import io.netty.buffer.ByteBuf; +import org.apache.qpid.proton.codec.ReadableBuffer; + +public class NettyReadable implements ReadableBuffer { + + private static final Charset Charset_UTF8 = Charset.forName("UTF-8"); + + private final ByteBuf buffer; + + public NettyReadable(ByteBuf buffer) { + this.buffer = buffer; + } + + @Override + public void put(ReadableBuffer other) { + buffer.writeBytes(other.byteBuffer()); + } + + @Override + public byte get() { + return buffer.readByte(); + } + + @Override + public int getInt() { + return buffer.readInt(); + } + + @Override + public long getLong() { + return buffer.readLong(); + } + + @Override + public short getShort() { + return buffer.readShort(); + } + + @Override + public float getFloat() { + return buffer.readFloat(); + } + + @Override + public double getDouble() { + return buffer.readDouble(); + } + + @Override + public ReadableBuffer get(byte[] data, int offset, int length) { + buffer.readBytes(data, offset, length); + return this; + } + + @Override + public ReadableBuffer get(byte[] data) { + buffer.readBytes(data); + return this; + } + + @Override + public ReadableBuffer position(int position) { + buffer.readerIndex(position); + return this; + } + + @Override + public ReadableBuffer slice() { + return new NettyReadable(buffer.slice()); + } + + @Override + public ReadableBuffer flip() { + return new NettyReadable(buffer.duplicate().setIndex(0, buffer.readerIndex())); + } + + @Override + public ReadableBuffer limit(int limit) { + buffer.writerIndex(limit); + return this; + } + + @Override + public int limit() { + return buffer.writerIndex(); + } + + @Override + public int remaining() { + return buffer.readableBytes(); + } + + @Override + public int position() { + return buffer.readerIndex(); + } + + @Override + public boolean hasRemaining() { + return buffer.readableBytes() > 0; + } + + @Override + public ReadableBuffer duplicate() { + return new NettyReadable(buffer.duplicate()); + } + + @Override + public ByteBuffer byteBuffer() { + return buffer.nioBuffer(0, buffer.writerIndex()); + } + + @Override + public String readUTF8() { + return buffer.toString(Charset_UTF8); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java new file mode 100644 index 0000000..b2f0fdc --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.util; + +import org.apache.qpid.proton.codec.AMQPDefinedTypes; +import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.codec.EncoderImpl; + +/** This can go away if Proton provides this feature. */ +public class TLSEncode { + + // For now Proton requires that we create a decoder to create an encoder + private static class EncoderDecoderPair { + DecoderImpl decoder = new DecoderImpl(); + EncoderImpl encoder = new EncoderImpl(decoder); + { + AMQPDefinedTypes.registerAllTypes(decoder, encoder); + } + } + + private static final ThreadLocal tlsCodec = new ThreadLocal() { + @Override + protected EncoderDecoderPair initialValue() { + return new EncoderDecoderPair(); + } + }; + + public static EncoderImpl getEncoder() { + return tlsCodec.get().encoder; + } + + public static DecoderImpl getDecoder() { + return tlsCodec.get().decoder; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java index 08c46be..8ced348 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java @@ -16,44 +16,28 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter; -import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE; -import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE; -import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE; -import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.wrapMessage; - -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage; -import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; -import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.message.Message; -import org.apache.qpid.proton.message.ProtonJMessage; import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.Assert; import org.junit.Test; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; public class TestConversions extends Assert { @@ -72,18 +56,12 @@ public class TestConversions extends Assert { message.setBody(new AmqpValue(new Boolean(true))); - EncodedMessage encodedMessage = encodeMessage(message); + AMQPMessage encodedMessage = new AMQPMessage(message); - ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerMessage serverMessage = converter.inbound(encodedMessage); + ICoreMessage serverMessage = encodedMessage.toCore(); - verifyProperties(new ServerJMSMessage(serverMessage, 0)); + verifyProperties(ServerJMSMessage.wrapCoreMessage(serverMessage)); - EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0); - Message amqpMessage = encoded.decode(); - - AmqpValue value = (AmqpValue) amqpMessage.getBody(); - assertEquals(value.getValue(), true); } @Test @@ -101,12 +79,11 @@ public class TestConversions extends Assert { message.setBody(new Data(new Binary(bodyBytes))); - EncodedMessage encodedMessage = encodeMessage(message); + AMQPMessage encodedMessage = new AMQPMessage(message); - ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerMessage serverMessage = converter.inbound(encodedMessage); + ICoreMessage serverMessage = encodedMessage.toCore(); - ServerJMSBytesMessage bytesMessage = (ServerJMSBytesMessage) wrapMessage(BYTES_TYPE, serverMessage, 0); + ServerJMSBytesMessage bytesMessage = (ServerJMSBytesMessage) ServerJMSMessage.wrapCoreMessage(serverMessage); verifyProperties(bytesMessage); @@ -118,9 +95,6 @@ public class TestConversions extends Assert { Assert.assertArrayEquals(bodyBytes, newBodyBytes); - Object obj = converter.outbound(serverMessage, 0); - - System.out.println("output = " + obj); } private void verifyProperties(javax.jms.Message message) throws Exception { @@ -151,12 +125,12 @@ public class TestConversions extends Assert { message.setBody(new AmqpValue(mapValues)); - EncodedMessage encodedMessage = encodeMessage(message); + AMQPMessage encodedMessage = new AMQPMessage(message); - ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerMessage serverMessage = converter.inbound(encodedMessage); + ICoreMessage serverMessage = encodedMessage.toCore(); + serverMessage.getReadOnlyBodyBuffer(); - ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) wrapMessage(MAP_TYPE, serverMessage, 0); + ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) ServerJMSMessage.wrapCoreMessage(serverMessage); mapMessage.decode(); verifyProperties(mapMessage); @@ -164,15 +138,8 @@ public class TestConversions extends Assert { Assert.assertEquals(1, mapMessage.getInt("someint")); Assert.assertEquals("value", mapMessage.getString("somestr")); - EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0); - Message amqpMessage = encoded.decode(); - - AmqpValue value = (AmqpValue) amqpMessage.getBody(); - Map mapoutput = (Map) value.getValue(); - - assertEquals(Integer.valueOf(1), mapoutput.get("someint")); - - System.out.println("output = " + amqpMessage); + AMQPMessage newAMQP = CoreAmqpConverter.fromCore(mapMessage.getInnerMessage()); + System.out.println(newAMQP.getProtonMessage().getBody()); } @Test @@ -188,14 +155,11 @@ public class TestConversions extends Assert { message.setBody(new AmqpSequence(objects)); - EncodedMessage encodedMessage = encodeMessage(message); + AMQPMessage encodedMessage = new AMQPMessage(message); - ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerMessage serverMessage = converter.inbound(encodedMessage); + ICoreMessage serverMessage = encodedMessage.toCore(); - simulatePersistence(serverMessage); - - ServerJMSStreamMessage streamMessage = (ServerJMSStreamMessage) wrapMessage(STREAM_TYPE, serverMessage, 0); + ServerJMSStreamMessage streamMessage = (ServerJMSStreamMessage) ServerJMSMessage.wrapCoreMessage(serverMessage); verifyProperties(streamMessage); @@ -203,13 +167,6 @@ public class TestConversions extends Assert { assertEquals(10, streamMessage.readInt()); assertEquals("10", streamMessage.readString()); - - EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0); - Message amqpMessage = encoded.decode(); - - List list = ((AmqpSequence) amqpMessage.getBody()).getValue(); - Assert.assertEquals(Integer.valueOf(10), list.get(0)); - Assert.assertEquals("10", list.get(1)); } @Test @@ -222,553 +179,17 @@ public class TestConversions extends Assert { String text = "someText"; message.setBody(new AmqpValue(text)); - EncodedMessage encodedMessage = encodeMessage(message); - - ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); - ServerMessage serverMessage = converter.inbound(encodedMessage); + AMQPMessage encodedMessage = new AMQPMessage(message); - simulatePersistence(serverMessage); + ICoreMessage serverMessage = encodedMessage.toCore(); - ServerJMSTextMessage textMessage = (ServerJMSTextMessage) wrapMessage(TEXT_TYPE, serverMessage, 0); + ServerJMSTextMessage textMessage = (ServerJMSTextMessage) ServerJMSMessage.wrapCoreMessage(serverMessage); textMessage.decode(); verifyProperties(textMessage); Assert.assertEquals(text, textMessage.getText()); - EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0); - Message amqpMessage = encoded.decode(); - - AmqpValue value = (AmqpValue) amqpMessage.getBody(); - String textValue = (String) value.getValue(); - - Assert.assertEquals(text, textValue); - - System.out.println("output = " + amqpMessage); - } - - private void simulatePersistence(ServerMessage serverMessage) { - serverMessage.setAddress(new SimpleString("SomeAddress")); - // This is just to simulate what would happen during the persistence of the message - // We need to still be able to recover the message when we read it back - ((EncodingSupport) serverMessage).encode(new EmptyBuffer()); - } - - private ProtonJMessage reEncodeMsg(Object obj) { - ProtonJMessage objOut = (ProtonJMessage) obj; - - ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); - - objOut.encode(new NettyWritable(nettyBuffer)); - return objOut; } - private EncodedMessage encodeMessage(MessageImpl message) { - ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024 * 1024); - message.encode(new NettyWritable(buf)); - byte[] bytesConvert = new byte[buf.writerIndex()]; - buf.readBytes(bytesConvert); - return new EncodedMessage(0, bytesConvert, 0, bytesConvert.length); - } - - class EmptyBuffer implements ActiveMQBuffer { - - @Override - public ByteBuf byteBuf() { - return null; - } - - @Override - public int capacity() { - return 0; - } - - @Override - public int readerIndex() { - return 0; - } - - @Override - public void readerIndex(int readerIndex) { - - } - - @Override - public int writerIndex() { - return 0; - } - - @Override - public void writerIndex(int writerIndex) { - - } - - @Override - public void setIndex(int readerIndex, int writerIndex) { - - } - - @Override - public int readableBytes() { - return 0; - } - - @Override - public int writableBytes() { - return 0; - } - - @Override - public boolean readable() { - return false; - } - - @Override - public boolean writable() { - return false; - } - - @Override - public void clear() { - - } - - @Override - public void markReaderIndex() { - - } - - @Override - public void resetReaderIndex() { - - } - - @Override - public void markWriterIndex() { - - } - - @Override - public void resetWriterIndex() { - - } - - @Override - public void discardReadBytes() { - - } - - @Override - public byte getByte(int index) { - return 0; - } - - @Override - public short getUnsignedByte(int index) { - return 0; - } - - @Override - public short getShort(int index) { - return 0; - } - - @Override - public int getUnsignedShort(int index) { - return 0; - } - - @Override - public int getInt(int index) { - return 0; - } - - @Override - public long getUnsignedInt(int index) { - return 0; - } - - @Override - public long getLong(int index) { - return 0; - } - - @Override - public void getBytes(int index, ActiveMQBuffer dst) { - - } - - @Override - public void getBytes(int index, ActiveMQBuffer dst, int length) { - - } - - @Override - public void getBytes(int index, ActiveMQBuffer dst, int dstIndex, int length) { - - } - - @Override - public void getBytes(int index, byte[] dst) { - - } - - @Override - public void getBytes(int index, byte[] dst, int dstIndex, int length) { - - } - - @Override - public void getBytes(int index, ByteBuffer dst) { - - } - - @Override - public char getChar(int index) { - return 0; - } - - @Override - public float getFloat(int index) { - return 0; - } - - @Override - public double getDouble(int index) { - return 0; - } - - @Override - public void setByte(int index, byte value) { - - } - - @Override - public void setShort(int index, short value) { - - } - - @Override - public void setInt(int index, int value) { - - } - - @Override - public void setLong(int index, long value) { - - } - - @Override - public void setBytes(int index, ActiveMQBuffer src) { - - } - - @Override - public void setBytes(int index, ActiveMQBuffer src, int length) { - - } - - @Override - public void setBytes(int index, ActiveMQBuffer src, int srcIndex, int length) { - - } - - @Override - public void setBytes(int index, byte[] src) { - - } - - @Override - public void setBytes(int index, byte[] src, int srcIndex, int length) { - - } - - @Override - public void setBytes(int index, ByteBuffer src) { - - } - - @Override - public void setChar(int index, char value) { - - } - - @Override - public void setFloat(int index, float value) { - - } - - @Override - public void setDouble(int index, double value) { - - } - - @Override - public byte readByte() { - return 0; - } - - @Override - public int readUnsignedByte() { - return 0; - } - - @Override - public short readShort() { - return 0; - } - - @Override - public int readUnsignedShort() { - return 0; - } - - @Override - public int readInt() { - return 0; - } - - @Override - public long readUnsignedInt() { - return 0; - } - - @Override - public long readLong() { - return 0; - } - - @Override - public char readChar() { - return 0; - } - - @Override - public float readFloat() { - return 0; - } - - @Override - public double readDouble() { - return 0; - } - - @Override - public boolean readBoolean() { - return false; - } - - @Override - public SimpleString readNullableSimpleString() { - return null; - } - - @Override - public String readNullableString() { - return null; - } - - @Override - public SimpleString readSimpleString() { - return null; - } - - @Override - public String readString() { - return null; - } - - @Override - public String readUTF() { - return null; - } - - @Override - public ActiveMQBuffer readBytes(int length) { - return null; - } - - @Override - public ActiveMQBuffer readSlice(int length) { - return null; - } - - @Override - public void readBytes(ActiveMQBuffer dst) { - - } - - @Override - public void readBytes(ActiveMQBuffer dst, int length) { - - } - - @Override - public void readBytes(ActiveMQBuffer dst, int dstIndex, int length) { - - } - - @Override - public void readBytes(byte[] dst) { - - } - - @Override - public void readBytes(byte[] dst, int dstIndex, int length) { - - } - - @Override - public void readBytes(ByteBuffer dst) { - - } - - @Override - public int skipBytes(int length) { - return length; - } - - @Override - public void writeByte(byte value) { - - } - - @Override - public void writeShort(short value) { - - } - - @Override - public void writeInt(int value) { - - } - - @Override - public void writeLong(long value) { - - } - - @Override - public void writeChar(char chr) { - - } - - @Override - public void writeFloat(float value) { - - } - - @Override - public void writeDouble(double value) { - - } - - @Override - public void writeBoolean(boolean val) { - - } - - @Override - public void writeNullableSimpleString(SimpleString val) { - - } - - @Override - public void writeNullableString(String val) { - - } - - @Override - public void writeSimpleString(SimpleString val) { - - } - - @Override - public void writeString(String val) { - - } - - @Override - public void writeUTF(String utf) { - - } - - @Override - public void writeBytes(ActiveMQBuffer src, int length) { - - } - - @Override - public void writeBytes(ActiveMQBuffer src, int srcIndex, int length) { - - } - - @Override - public void writeBytes(byte[] src) { - - } - - @Override - public void writeBytes(byte[] src, int srcIndex, int length) { - - } - - @Override - public void writeBytes(ByteBuffer src) { - - } - - @Override - public void readFully(byte[] b) throws IOException { - } - - @Override - public void readFully(byte[] b, int off, int len) throws IOException { - } - - @Override - public String readLine() throws IOException { - return null; - } - - @Override - public ActiveMQBuffer copy() { - return null; - } - - @Override - public ActiveMQBuffer copy(int index, int length) { - return null; - } - - @Override - public ActiveMQBuffer slice() { - return null; - } - - @Override - public ActiveMQBuffer slice(int index, int length) { - return null; - } - - @Override - public ActiveMQBuffer duplicate() { - return null; - } - - @Override - public ByteBuffer toByteBuffer() { - return null; - } - - @Override - public ByteBuffer toByteBuffer(int index, int length) { - return null; - } - - @Override - public void release() { - //no-op - } - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java index 4caead7..54a98e6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java @@ -16,15 +16,17 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPContentTypeSupport; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + public class AMQPContentTypeSupportTest { @Test(expected = ActiveMQAMQPInvalidContentTypeException.class) @@ -216,7 +218,7 @@ public class AMQPContentTypeSupportTest { @Test public void testParseContentTypeWithApplicationJavaSerialized() throws Exception { // Expect null as this is not a textual type - doParseContentTypeTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, null); + doParseContentTypeTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), null); } private void doParseContentTypeTestImpl(String contentType, Charset expected) throws ActiveMQAMQPInvalidContentTypeException { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java index c53cda5..60c1989 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java @@ -20,19 +20,20 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - import java.util.UUID; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.UnsignedLong; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + public class AMQPMessageIdHelperTest { private AMQPMessageIdHelper messageIdHelper; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java index d4e078f..6aeb4dc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java @@ -16,20 +16,21 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - import java.util.HashMap; import java.util.Map; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.message.Message; import org.junit.Test; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + public class AMQPMessageSupportTest { // ---------- getSymbol ---------------------------------------------------// http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java index d7a948a..b7092c3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java @@ -16,36 +16,29 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import javax.jms.Destination; -import javax.jms.MapMessage; -import javax.jms.Queue; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; - -import org.apache.activemq.artemis.jms.client.ActiveMQMessage; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; @@ -57,13 +50,15 @@ import org.apache.qpid.proton.message.Message; import org.junit.Before; import org.junit.Test; -public class JMSMappingInboundTransformerTest { +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; - private IDGenerator idGenerator; +public class JMSMappingInboundTransformerTest { @Before public void setUp() { - this.idGenerator = new SimpleIDGenerator(0); } // ----- Null Body Section ------------------------------------------------// @@ -77,13 +72,14 @@ public class JMSMappingInboundTransformerTest { */ @Test public void testCreateBytesMessageFromNoBodySectionAndContentType() throws Exception { - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - Message message = Message.Factory.create(); message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE); - EncodedMessage em = encodeMessage(message); - javax.jms.Message jmsMessage = transformer.transform(em); + AMQPMessage messageEncode = new AMQPMessage(message); + + ICoreMessage coreMessage = messageEncode.toCore(); + + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(coreMessage); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass()); @@ -98,74 +94,25 @@ public class JMSMappingInboundTransformerTest { */ @Test public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception { - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - Message message = Message.Factory.create(); - EncodedMessage em = encodeMessage(message); - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass()); } - /** - * Test that a message with no body section, but with the content type set to - * {@value AMQPMessageSupport#SERIALIZED_JAVA_OBJECT_CONTENT_TYPE} results in an - * ObjectMessage when not otherwise annotated to indicate the type of JMS message it is. - * - * @throws Exception - * if an error occurs during the test. - */ - @Test - public void testCreateObjectMessageFromNoBodySectionAndContentType() throws Exception { - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - - Message message = Message.Factory.create(); - message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); - - EncodedMessage em = encodeMessage(message); - javax.jms.Message jmsMessage = transformer.transform(em); - - assertNotNull("Message should not be null", jmsMessage); - assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass()); - } - @Test public void testCreateTextMessageFromNoBodySectionAndContentType() throws Exception { - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - Message message = Message.Factory.create(); message.setContentType("text/plain"); - EncodedMessage em = encodeMessage(message); - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass()); } - /** - * Test that a message with no body section, and with the content type set to an unknown - * value results in a plain Message when not otherwise annotated to indicate the type of JMS - * message it is. - * - * @throws Exception - * if an error occurs during the test. - */ - public void testCreateGenericMessageFromNoBodySectionAndUnknownContentType() throws Exception { - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - - Message message = Message.Factory.create(); - message.setContentType("unknown-content-type"); - - EncodedMessage em = encodeMessage(message); - javax.jms.Message jmsMessage = transformer.transform(em); - - assertNotNull("Message should not be null", jmsMessage); - assertEquals("Unexpected message class type", ActiveMQMessage.class, jmsMessage.getClass()); - } - // ----- Data Body Section ------------------------------------------------// /** @@ -183,10 +130,7 @@ public class JMSMappingInboundTransformerTest { message.setBody(new Data(binary)); message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE); - EncodedMessage em = encodeMessage(message); - - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass()); @@ -206,10 +150,7 @@ public class JMSMappingInboundTransformerTest { message.setBody(new Data(binary)); message.setContentType("unknown-content-type"); - EncodedMessage em = encodeMessage(message); - - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass()); @@ -230,10 +171,7 @@ public class JMSMappingInboundTransformerTest { assertNull(message.getContentType()); - EncodedMessage em = encodeMessage(message); - - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass()); @@ -252,12 +190,9 @@ public class JMSMappingInboundTransformerTest { Message message = Proton.message(); Binary binary = new Binary(new byte[0]); message.setBody(new Data(binary)); - message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); - - EncodedMessage em = encodeMessage(message); + message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString()); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass()); @@ -357,10 +292,7 @@ public class JMSMappingInboundTransformerTest { message.setBody(new Data(binary)); message.setContentType(contentType); - EncodedMessage em = encodeMessage(message); - - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); if (StandardCharsets.UTF_8.equals(expectedCharset)) { @@ -384,10 +316,7 @@ public class JMSMappingInboundTransformerTest { Message message = Proton.message(); message.setBody(new AmqpValue("content")); - EncodedMessage em = encodeMessage(message); - - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass()); @@ -405,10 +334,7 @@ public class JMSMappingInboundTransformerTest { Message message = Proton.message(); message.setBody(new AmqpValue(null)); - EncodedMessage em = encodeMessage(message); - - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass()); @@ -424,14 +350,11 @@ public class JMSMappingInboundTransformerTest { */ @Test public void testCreateObjectMessageFromAmqpValueWithBinaryAndContentType() throws Exception { - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - Message message = Message.Factory.create(); message.setBody(new AmqpValue(new Binary(new byte[0]))); - message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); + message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString()); - EncodedMessage em = encodeMessage(message); - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass()); @@ -450,50 +373,13 @@ public class JMSMappingInboundTransformerTest { Map map = new HashMap<>(); message.setBody(new AmqpValue(map)); - EncodedMessage em = encodeMessage(message); - - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSMapMessage.class, jmsMessage.getClass()); } /** - * Test that an amqp-value body containing a map that has an AMQP Binary as one of the - * entries encoded into the Map results in an MapMessage where a byte array can be read from - * the entry. - * - * @throws Exception - * if an error occurs during the test. - */ - @Test - public void testCreateAmqpMapMessageFromAmqpValueWithMapContainingBinaryEntry() throws Exception { - final String ENTRY_NAME = "bytesEntry"; - - Message message = Proton.message(); - Map map = new HashMap<>(); - - byte[] inputBytes = new byte[] {1, 2, 3, 4, 5}; - map.put(ENTRY_NAME, new Binary(inputBytes)); - - message.setBody(new AmqpValue(map)); - - EncodedMessage em = encodeMessage(message); - - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - javax.jms.Message jmsMessage = transformer.transform(em); - - assertNotNull("Message should not be null", jmsMessage); - assertEquals("Unexpected message class type", ServerJMSMapMessage.class, jmsMessage.getClass()); - - MapMessage mapMessage = (MapMessage) jmsMessage; - byte[] outputBytes = mapMessage.getBytes(ENTRY_NAME); - assertNotNull(outputBytes); - assertTrue(Arrays.equals(inputBytes, outputBytes)); - } - - /** * Test that an amqp-value body containing a list results in an StreamMessage when not * otherwise annotated to indicate the type of JMS message it is. * @@ -506,10 +392,7 @@ public class JMSMappingInboundTransformerTest { List list = new ArrayList<>(); message.setBody(new AmqpValue(list)); - EncodedMessage em = encodeMessage(message); - - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass()); @@ -528,10 +411,7 @@ public class JMSMappingInboundTransformerTest { List list = new ArrayList<>(); message.setBody(new AmqpSequence(list)); - EncodedMessage em = encodeMessage(message); - - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass()); @@ -550,10 +430,7 @@ public class JMSMappingInboundTransformerTest { Binary binary = new Binary(new byte[0]); message.setBody(new AmqpValue(binary)); - EncodedMessage em = encodeMessage(message); - - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass()); @@ -572,11 +449,7 @@ public class JMSMappingInboundTransformerTest { Message message = Proton.message(); message.setBody(new AmqpValue(UUID.randomUUID())); - EncodedMessage em = encodeMessage(message); - - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass()); @@ -588,10 +461,8 @@ public class JMSMappingInboundTransformerTest { Message message = Message.Factory.create(); message.setBody(new AmqpValue(contentString)); - EncodedMessage em = encodeMessage(message); - - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); - javax.jms.Message jmsMessage = transformer.transform(em); + ServerJMSTextMessage jmsMessage = (ServerJMSTextMessage)ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore()); + jmsMessage.decode(); assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage); assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass()); @@ -631,7 +502,6 @@ public class JMSMappingInboundTransformerTest { private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class expectedClass) throws Exception { - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); String toAddress = "toAddress"; Message amqp = Message.Factory.create(); @@ -644,9 +514,7 @@ public class JMSMappingInboundTransformerTest { amqp.setMessageAnnotations(ma); } - EncodedMessage em = encodeMessage(amqp); - - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(amqp).toCore()); assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage); } @@ -679,7 +547,6 @@ public class JMSMappingInboundTransformerTest { private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class expectedClass) throws Exception { - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator); String replyToAddress = "replyToAddress"; Message amqp = Message.Factory.create(); @@ -692,27 +559,8 @@ public class JMSMappingInboundTransformerTest { amqp.setMessageAnnotations(ma); } - EncodedMessage em = encodeMessage(amqp); - - javax.jms.Message jmsMessage = transformer.transform(em); + javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(amqp).toCore()); assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage); } - // ----- Utility Methods --------------------------------------------------// - - private EncodedMessage encodeMessage(Message message) { - byte[] encodeBuffer = new byte[1024 * 8]; - int encodedSize; - while (true) { - try { - encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length); - break; - } catch (java.nio.BufferOverflowException e) { - encodeBuffer = new byte[encodeBuffer.length * 2]; - } - } - - long messageFormat = 0; - return new EncodedMessage(messageFormat, encodeBuffer, 0, encodedSize); - } }