Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 79BB3200D3E for ; Thu, 16 Nov 2017 12:29:01 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 76976160BF4; Thu, 16 Nov 2017 11:29:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6F3A6160BE5 for ; Thu, 16 Nov 2017 12:29:00 +0100 (CET) Received: (qmail 8866 invoked by uid 500); 16 Nov 2017 11:28:59 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 8857 invoked by uid 99); 16 Nov 2017 11:28:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Nov 2017 11:28:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C8A46E78DF; Thu, 16 Nov 2017 11:28:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: martyntaylor@apache.org To: commits@activemq.apache.org Date: Thu, 16 Nov 2017 11:28:58 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] activemq-artemis git commit: ARTEMIS-1514 Large message fix archived-at: Thu, 16 Nov 2017 11:29:01 -0000 Repository: activemq-artemis Updated Branches: refs/heads/master 5cea228db -> 3c04de3ab ARTEMIS-1514 Large message fix I'm doing an overal improvement on large message support for AMQP However this commit is just about a Bug on the converter. It will be moot after all the changes I'm making, but I would rather keep this separate as a way to cherry-pick on previous versions eventually. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9daa0321 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9daa0321 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9daa0321 Branch: refs/heads/master Commit: 9daa0321b668fbb1f45349f9e33937aa5a7c705e Parents: 5cea228 Author: Clebert Suconic Authored: Mon Nov 13 16:36:47 2017 -0500 Committer: Clebert Suconic Committed: Wed Nov 15 20:54:58 2017 -0500 ---------------------------------------------------------------------- .../amqp/broker/AMQPSessionCallback.java | 2 +- .../amqp/converter/AmqpCoreConverter.java | 1 + .../converter/jms/ServerJMSBytesMessage.java | 6 +- .../amqp/converter/jms/ServerJMSMessage.java | 8 +- .../impl/journal/LargeServerMessageImpl.java | 22 +++ .../integration/amqp/AmqpLargeMessageTest.java | 189 +++++++++++++++++++ 6 files changed, 223 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9daa0321/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 667d57a..42e9625 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -209,7 +209,7 @@ public class AMQPSessionCallback implements SessionCallback { filter = SelectorTranslator.convertToActiveMQFilterString(filter); - ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly); + ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly, false, null); // AMQP handles its own flow control for when it's started consumer.setStarted(true); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9daa0321/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index acd940b..8d05b2c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -186,6 +186,7 @@ public class AmqpCoreConverter { result.getInnerMessage().setReplyTo(message.getReplyTo()); result.getInnerMessage().setDurable(message.isDurable()); result.getInnerMessage().setPriority(message.getPriority()); + result.getInnerMessage().setAddress(message.getAddress()); result.encode(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9daa0321/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java index b6a829d..a94cfde 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java @@ -200,8 +200,10 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess @Override public void reset() throws JMSException { - bytesMessageReset(getReadBodyBuffer()); - bytesMessageReset(getWriteBodyBuffer()); + if (!message.isLargeMessage()) { + bytesMessageReset(getReadBodyBuffer()); + bytesMessageReset(getWriteBodyBuffer()); + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9daa0321/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java index 2a52f7a..5962e39 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java @@ -369,11 +369,15 @@ public class ServerJMSMessage implements Message { * Encode the body into the internal message */ public void encode() throws Exception { - message.getBodyBuffer().resetReaderIndex(); + if (!message.isLargeMessage()) { + message.getBodyBuffer().resetReaderIndex(); + } } public void decode() throws Exception { - message.getBodyBuffer().resetReaderIndex(); + if (!message.isLargeMessage()) { + message.getBodyBuffer().resetReaderIndex(); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9daa0321/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 0a2d3b2..11d1a21 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; @@ -197,6 +198,27 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe return currentRefCount; } + // Even though not recommended, in certain instances + // we may need to convert a large message back to a whole buffer + // in a way you can convert + @Override + public ActiveMQBuffer getReadOnlyBodyBuffer() { + try { + file.open(); + ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer((int) file.size()); + file.read(buffer.toByteBuffer()); + return buffer; + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + try { + file.close(); + } catch (Exception ignored) { + } + + } + } + @Override public boolean isLargeMessage() { return true; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9daa0321/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java new file mode 100644 index 0000000..07ab5a5 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.message.impl.MessageImpl; +import org.junit.Assert; +import org.junit.Test; + +public class AmqpLargeMessageTest extends AmqpClientTestSupport { + + private static final int FRAME_SIZE = 10024; + private static final int PAYLOAD = 110 * 1024; + + String testQueueName = "ConnectionFrameSize"; + + @Override + protected void configureAMQPAcceptorParameters(Map params) { + params.put("maxFrameSize", FRAME_SIZE); + } + + @Override + protected void createAddressAndQueues(ActiveMQServer server) throws Exception { + } + + + @Override + protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception { + //server.getConfiguration().addAcceptorConfiguration("tcp", "tcp://localhost:5445"); + server.getConfiguration().addAcceptorConfiguration("tcp", "tcp://localhost:61616"); + } + + + @Test(timeout = 60000) + public void testSendAMQPReceiveCore() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + int nMsgs = 200; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + sendMessages(nMsgs, connection); + + int count = getMessageCount(server.getPostOffice(), testQueueName); + assertEquals(nMsgs, count); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + receiveJMS(nMsgs, factory); + } finally { + connection.close(); + } + } + + + @Test(timeout = 60000) + public void testSendAMQPReceiveOpenWire() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + int nMsgs = 200; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + sendMessages(nMsgs, connection); + + int count = getMessageCount(server.getPostOffice(), testQueueName); + assertEquals(nMsgs, count); + + ConnectionFactory factory = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"); + receiveJMS(nMsgs, factory); + } finally { + connection.close(); + } + } + + private void sendMessages(int nMsgs, AmqpConnection connection) throws Exception { + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(testQueueName); + + for (int i = 0; i < nMsgs; ++i) { + AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD); + message.setApplicationProperty("i", (Integer) i); + message.setDurable(true); + sender.send(message); + } + + session.close(); + } + + private void receiveJMS(int nMsgs, + ConnectionFactory factory) throws JMSException { + Connection connection2 = factory.createConnection(); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection2.start(); + MessageConsumer consumer = session2.createConsumer(session2.createQueue(testQueueName)); + + for (int i = 0; i < nMsgs; i++) { + Message message = consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals(i, message.getIntProperty("i")); + } + + connection2.close(); + } + + @Test(timeout = 60000) + public void testSendAMQPReceiveAMQP() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + String testQueueName = "ConnectionFrameSize"; + int nMsgs = 200; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + sendMessages(nMsgs, connection); + + int count = getMessageCount(server.getPostOffice(), testQueueName); + assertEquals(nMsgs, count); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(testQueueName); + receiver.flow(nMsgs); + + for (int i = 0; i < nMsgs; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("failed at " + i, message); + MessageImpl wrapped = (MessageImpl) message.getWrappedMessage(); + if (wrapped.getBody() instanceof Data) { + // converters can change this to AmqValue + Data data = (Data) wrapped.getBody(); + System.out.println("received : message: " + data.getValue().getLength()); + assertEquals(PAYLOAD, data.getValue().getLength()); + } + message.accept(); + } + session.close(); + + } finally { + connection.close(); + } + } + + private AmqpMessage createAmqpMessage(byte value, int payloadSize) { + AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[payloadSize]; + for (int i = 0; i < payload.length; i++) { + payload[i] = value; + } + message.setBytes(payload); + return message; + } +}