activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject activemq-artemis git commit: ARTEMIS-969 Unecessary buffer expansion on message delivery
Date Wed, 15 Feb 2017 19:40:05 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x b0585329b -> c5c50c53b


ARTEMIS-969 Unecessary buffer expansion on message delivery

(cherry picked from commit f38d5c7dbcb66bebc5b9fba984845bd8d6aadc0c)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c5c50c53
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c5c50c53
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c5c50c53

Branch: refs/heads/1.x
Commit: c5c50c53be6541f91796e611d9d8b0420831736c
Parents: b058532
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Feb 15 12:51:05 2017 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Feb 15 14:39:54 2017 -0500

----------------------------------------------------------------------
 .../impl/wireformat/SessionReceiveMessage.java  |  4 +-
 .../tests/integration/client/ConsumerTest.java  | 48 ++++++++++++++++++++
 2 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c5c50c53/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
index ce76186..c21ebda 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
@@ -56,8 +56,8 @@ public class SessionReceiveMessage extends MessagePacket {
    public ActiveMQBuffer encode(final RemotingConnection connection) {
       ActiveMQBuffer buffer = message.getEncodedBuffer();
 
-      ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex(),
true);
-      bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity());
+      ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex()
+ DataConstants.SIZE_LONG + DataConstants.SIZE_INT, true);
+      bufferWrite.writeBytes(buffer, 0, buffer.capacity());
       bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
 
       // Sanity check

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c5c50c53/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index 1c1f929..8f00b2a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -34,6 +35,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
@@ -41,6 +43,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.junit.Assert;
 import org.junit.Before;
@@ -96,6 +99,50 @@ public class ConsumerTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testSimpleSend() throws Throwable {
+      receive(false);
+   }
+
+   @Test
+   public void testSimpleSendWithCloseConsumer() throws Throwable {
+      receive(true);
+   }
+
+   private void receive(boolean cancelOnce) throws Throwable {
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, true, true, false);
+
+      session.createQueue(QUEUE, QUEUE, null, false);
+
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+      ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 0, System.currentTimeMillis(),
(byte) 4);
+      message.getBodyBuffer().writeString("hi");
+      message.putStringProperty("hello", "elo");
+      producer.send(message);
+
+      session.start();
+
+      if (cancelOnce) {
+         final ClientConsumerInternal consumerInternal = (ClientConsumerInternal)consumer;
+         Wait.waitFor(() -> consumerInternal.getBufferSize() > 0);
+         consumer.close();
+         consumer = session.createConsumer(QUEUE);
+      }
+      ClientMessage message2 = consumer.receive(1000);
+
+      System.out.println("Id::" + message2.getMessageID());
+
+      System.out.println("Received " + message2);
+
+      session.close();
+   }
+
+
+
+   @Test
    public void testConsumerAckImmediateAutoCommitTrue() throws Exception {
       ClientSessionFactory sf = createSessionFactory(locator);
 
@@ -323,6 +370,7 @@ public class ConsumerTest extends ActiveMQTestBase {
       ClientSessionFactory sf = createSessionFactory(locator);
 
       ClientSession session = sf.createSession(false, true, true);
+      session.start();
 
       session.createQueue(QUEUE, QUEUE, null, false);
 


Mime
View raw message