From commits-return-50963-archive-asf-public=cust-asf.ponee.io@activemq.apache.org Wed Apr 4 15:18:49 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id A418B18064F for ; Wed, 4 Apr 2018 15:18:48 +0200 (CEST) Received: (qmail 63985 invoked by uid 500); 4 Apr 2018 13:18:47 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 63976 invoked by uid 99); 4 Apr 2018 13:18:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Apr 2018 13:18:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A027BEB4F4; Wed, 4 Apr 2018 13:18:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 04 Apr 2018 13:18:47 -0000 Message-Id: <33ba6cfb10394810bee736708e0cc242@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] activemq-artemis git commit: ARTEMIS-1780 Handle conversion of large ObjectMessage types Repository: activemq-artemis Updated Branches: refs/heads/master 32090121d -> 6ec375bdb ARTEMIS-1780 Handle conversion of large ObjectMessage types Make sure the correct buffer is used when decoding the stored Core message that originated from the conversion of an AMQP message sent and annotated as a JMS ObjectMessage which trips the large message boundary. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fc32bc0b Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fc32bc0b Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fc32bc0b Branch: refs/heads/master Commit: fc32bc0b07ef37ca172dd2777701ad16bbe87ba5 Parents: 3209012 Author: Timothy Bish Authored: Mon Apr 2 17:57:54 2018 -0400 Committer: Clebert Suconic Committed: Tue Apr 3 22:27:43 2018 -0400 ---------------------------------------------------------------------- .../amqp/converter/CoreAmqpConverter.java | 77 ++++++----- .../converter/jms/ServerJMSObjectMessage.java | 9 +- .../integration/amqp/AmqpLargeMessageTest.java | 130 ++++++++++++++++++- 3 files changed, 167 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fc32bc0b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java index 8939982..abda58a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java @@ -17,14 +17,31 @@ package org.apache.activemq.artemis.protocol.amqp.converter; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.Queue; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; +import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID; +import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.EMPTY_BINARY; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_NATIVE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_PROPERTIES; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.QUEUE_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_QUEUE_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_TOPIC_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TOPIC_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.toAddress; + import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Date; @@ -34,8 +51,15 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.Queue; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; + import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; @@ -67,30 +91,8 @@ import org.apache.qpid.proton.codec.EncoderImpl; import org.apache.qpid.proton.codec.WritableBuffer; import org.jboss.logging.Logger; -import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID; -import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.EMPTY_BINARY; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_NATIVE; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_PREFIX; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_PROPERTIES; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.QUEUE_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_QUEUE_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_TOPIC_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TOPIC_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.toAddress; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; public class CoreAmqpConverter { @@ -405,12 +407,7 @@ public class CoreAmqpConverter { } private static Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException { - message.getInnerMessage().getBodyBuffer().resetReaderIndex(); - int size = message.getInnerMessage().getBodyBuffer().readInt(); - byte[] bytes = new byte[size]; - message.getInnerMessage().getBodyBuffer().readBytes(bytes); - - return new Binary(bytes); + return message.getSerializedForm(); } private static Map getMapFromMessageBody(ServerJMSMapMessage message) throws JMSException { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fc32bc0b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java index 23ffb09..1281f2b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java @@ -16,10 +16,12 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.jms; +import java.io.Serializable; + import javax.jms.JMSException; import javax.jms.ObjectMessage; -import java.io.Serializable; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.qpid.proton.amqp.Binary; @@ -62,9 +64,10 @@ public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMe @Override public void decode() throws Exception { super.decode(); - int size = getInnerMessage().getBodyBuffer().readInt(); + ActiveMQBuffer buffer = getInnerMessage().getDataBuffer(); + int size = buffer.readInt(); byte[] bytes = new byte[size]; - getInnerMessage().getBodyBuffer().readBytes(bytes); + buffer.readBytes(bytes); payload = new Binary(bytes); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fc32bc0b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java index 07ab5a5..d70c700 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java @@ -16,14 +16,21 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; import javax.jms.Session; -import java.util.Map; -import java.util.concurrent.TimeUnit; +import javax.jms.TextMessage; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -35,6 +42,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.Assert; @@ -56,14 +64,11 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { protected void createAddressAndQueues(ActiveMQServer server) throws Exception { } - @Override protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception { - //server.getConfiguration().addAcceptorConfiguration("tcp", "tcp://localhost:5445"); server.getConfiguration().addAcceptorConfiguration("tcp", "tcp://localhost:61616"); } - @Test(timeout = 60000) public void testSendAMQPReceiveCore() throws Exception { server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); @@ -85,7 +90,6 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { } } - @Test(timeout = 60000) public void testSendAMQPReceiveOpenWire() throws Exception { server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); @@ -177,6 +181,120 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { } } + @Test(timeout = 60000) + public void testSendAMQPReceiveAMQPViaJMSObjectMessage() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + String testQueueName = "ConnectionFrameSize"; + int nMsgs = 1; + + ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616"); + + sendObjectMessages(nMsgs, new JmsConnectionFactory("amqp://localhost:61616")); + + int count = getMessageCount(server.getPostOffice(), testQueueName); + assertEquals(nMsgs, count); + + receiveJMS(nMsgs, factory); + } + + @Test(timeout = 60000) + public void testSendAMQPReceiveAMQPViaJMSText() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + String testQueueName = "ConnectionFrameSize"; + int nMsgs = 1; + + ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616"); + + sendTextMessages(nMsgs, new JmsConnectionFactory("amqp://localhost:61616")); + + int count = getMessageCount(server.getPostOffice(), testQueueName); + assertEquals(nMsgs, count); + + receiveJMS(nMsgs, factory); + } + + @Test(timeout = 60000) + public void testSendAMQPReceiveAMQPViaJMSBytes() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + String testQueueName = "ConnectionFrameSize"; + int nMsgs = 1; + + ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616"); + + sendBytesMessages(nMsgs, new JmsConnectionFactory("amqp://localhost:61616")); + + int count = getMessageCount(server.getPostOffice(), testQueueName); + assertEquals(nMsgs, count); + + receiveJMS(nMsgs, factory); + } + + private void sendObjectMessages(int nMsgs, ConnectionFactory factory) throws Exception { + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(); + Queue queue = session.createQueue(testQueueName); + MessageProducer producer = session.createProducer(queue); + ObjectMessage msg = session.createObjectMessage(); + + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < PAYLOAD; ++i) { + builder.append("A"); + } + + msg.setObject(builder.toString()); + + for (int i = 0; i < nMsgs; ++i) { + msg.setIntProperty("i", (Integer) i); + producer.send(msg); + } + } + } + + private void sendTextMessages(int nMsgs, ConnectionFactory factory) throws Exception { + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(); + Queue queue = session.createQueue(testQueueName); + MessageProducer producer = session.createProducer(queue); + TextMessage msg = session.createTextMessage(); + + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < PAYLOAD; ++i) { + builder.append("A"); + } + + msg.setText(builder.toString()); + + for (int i = 0; i < nMsgs; ++i) { + msg.setIntProperty("i", (Integer) i); + producer.send(msg); + } + } + } + + private void sendBytesMessages(int nMsgs, ConnectionFactory factory) throws Exception { + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(); + Queue queue = session.createQueue(testQueueName); + MessageProducer producer = session.createProducer(queue); + BytesMessage msg = session.createBytesMessage(); + + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < PAYLOAD; ++i) { + builder.append("A"); + } + + msg.writeBytes(builder.toString().getBytes(StandardCharsets.UTF_8)); + + for (int i = 0; i < nMsgs; ++i) { + msg.setIntProperty("i", (Integer) i); + producer.send(msg); + } + } + } + private AmqpMessage createAmqpMessage(byte value, int payloadSize) { AmqpMessage message = new AmqpMessage(); byte[] payload = new byte[payloadSize];