activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-1308: Make acknowlegde in AcitveMQMessage non blocking
Date Tue, 08 Aug 2017 14:20:17 GMT
ARTEMIS-1308: Make acknowlegde in AcitveMQMessage non blocking 

Allow commit within the acknowledge to be non blocking (batch) this toggles the on the existing
blockonacknowlegde config.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7b40abea
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7b40abea
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7b40abea

Branch: refs/heads/master
Commit: 7b40abead95b36e5769769373d6f7bab8e34dde9
Parents: 88f78d9
Author: Michael Andre Pearce <Michael.Andre.Pearce@me.com>
Authored: Fri Jul 28 14:27:29 2017 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Aug 8 10:07:15 2017 -0400

----------------------------------------------------------------------
 .../artemis/api/core/client/ClientSession.java  | 10 ++-
 .../core/client/impl/ClientSessionImpl.java     | 10 ++-
 .../core/impl/ActiveMQSessionContext.java       |  9 +++
 .../spi/core/remoting/SessionContext.java       |  3 +
 .../artemis/jms/client/ActiveMQConnection.java  |  7 +-
 .../artemis/jms/client/ActiveMQMessage.java     | 15 ++++-
 .../jms/client/ActiveMQMessageConsumer.java     |  3 +
 .../jms/client/JMSMessageListenerWrapper.java   |  9 ++-
 .../jms/consumer/JmsConsumerTest.java           |  9 ++-
 .../artemis/jms/tests/AcknowledgementTest.java  | 69 ++++++++++++++++++++
 .../jms/tests/message/MessageHeaderTest.java    |  4 ++
 11 files changed, 137 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index c3d6749..ab59eb6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -894,13 +894,21 @@ public interface ClientSession extends XAResource, AutoCloseable {
    boolean isXA();
 
    /**
-    * Commits the current transaction.
+    * Commits the current transaction, blocking.
     *
     * @throws ActiveMQException if an exception occurs while committing the transaction
     */
    void commit() throws ActiveMQException;
 
    /**
+    * Commits the current transaction.
+    *
+    * @param block if the commit will be blocking or not.
+    * @throws ActiveMQException if an exception occurs while committing the transaction
+    */
+   void commit(boolean block) throws ActiveMQException;
+
+   /**
     * Rolls back the current transaction.
     *
     * @throws ActiveMQException if an exception occurs while rolling back the transaction

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 5f6b40b..ef4e87c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -761,6 +761,11 @@ public final class ClientSessionImpl implements ClientSessionInternal,
FailureLi
 
    @Override
    public void commit() throws ActiveMQException {
+      commit(true);
+   }
+
+   @Override
+   public void commit(boolean block) throws ActiveMQException {
       checkClosed();
 
       if (logger.isTraceEnabled()) {
@@ -782,8 +787,9 @@ public final class ClientSessionImpl implements ClientSessionInternal,
FailureLi
       if (rollbackOnly) {
          rollbackOnFailover(true);
       }
+      startCall();
       try {
-         sessionContext.simpleCommit();
+         sessionContext.simpleCommit(block);
       } catch (ActiveMQException e) {
          if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT
|| rollbackOnly) {
             // The call to commit was unlocked on failover, we therefore rollback the tx,
@@ -794,6 +800,8 @@ public final class ClientSessionImpl implements ClientSessionInternal,
FailureLi
          } else {
             throw e;
          }
+      } finally {
+         endCall();
       }
 
       //oops, we have failed over during the commit and don't know what happened

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index fc43672..d0d75ac 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -349,6 +349,15 @@ public class ActiveMQSessionContext extends SessionContext {
    }
 
    @Override
+   public void simpleCommit(boolean block) throws ActiveMQException {
+      if (block) {
+         sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE);
+      } else {
+         sessionChannel.sendBatched(new PacketImpl(PacketImpl.SESS_COMMIT));
+      }
+   }
+
+   @Override
    public void simpleRollback(boolean lastMessageAsDelivered) throws ActiveMQException {
       sessionChannel.sendBlocking(new RollbackMessage(lastMessageAsDelivered), PacketImpl.NULL_RESPONSE);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index b123960..78135a8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -214,6 +214,9 @@ public abstract class SessionContext {
 
    public abstract void simpleCommit() throws ActiveMQException;
 
+   public abstract void simpleCommit(boolean block) throws ActiveMQException;
+
+
    /**
     * If we are doing a simple rollback on the RA, we need to ack the last message sent to
the consumer,
     * otherwise DLQ won't work.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
index 6432af2..bf0d236 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
@@ -597,7 +597,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl
impleme
 
       try {
          ClientSession session;
-
+         boolean isBlockOnAcknowledge = sessionFactory.getServerLocator().isBlockOnAcknowledge();
+         int ackBatchSize = sessionFactory.getServerLocator().getAckBatchSize();
          if (acknowledgeMode == Session.SESSION_TRANSACTED) {
             session = sessionFactory.createSession(username, password, isXA, false, false,
sessionFactory.getServerLocator().isPreAcknowledge(), transactionBatchSize);
          } else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE) {
@@ -605,9 +606,9 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl
impleme
          } else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) {
             session = sessionFactory.createSession(username, password, isXA, true, true,
sessionFactory.getServerLocator().isPreAcknowledge(), dupsOKBatchSize);
          } else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
-            session = sessionFactory.createSession(username, password, isXA, true, false,
sessionFactory.getServerLocator().isPreAcknowledge(), transactionBatchSize);
+            session = sessionFactory.createSession(username, password, isXA, true, false,
sessionFactory.getServerLocator().isPreAcknowledge(), isBlockOnAcknowledge ? transactionBatchSize
: ackBatchSize);
          } else if (acknowledgeMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) {
-            session = sessionFactory.createSession(username, password, isXA, true, false,
false, transactionBatchSize);
+            session = sessionFactory.createSession(username, password, isXA, true, false,
false, isBlockOnAcknowledge ? transactionBatchSize : ackBatchSize);
          } else if (acknowledgeMode == ActiveMQJMSConstants.PRE_ACKNOWLEDGE) {
             session = sessionFactory.createSession(username, password, isXA, true, false,
true, transactionBatchSize);
          } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index f13f602..928d375 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
+import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.reader.MessageUtil;
@@ -200,6 +201,8 @@ public class ActiveMQMessage implements javax.jms.Message {
 
    private boolean individualAck;
 
+   private boolean clientAck;
+
    private long jmsDeliveryTime;
 
    // Constructors --------------------------------------------------
@@ -710,11 +713,15 @@ public class ActiveMQMessage implements javax.jms.Message {
    public void acknowledge() throws JMSException {
       if (session != null) {
          try {
+            if (session.isClosed()) {
+               throw ActiveMQClientMessageBundle.BUNDLE.sessionClosed();
+            }
             if (individualAck) {
                message.individualAcknowledge();
             }
-
-            session.commit();
+            if (clientAck || individualAck) {
+               session.commit(session.isBlockOnAcknowledge());
+            }
          } catch (ActiveMQException e) {
             throw JMSExceptionHelper.convertFromActiveMQException(e);
          }
@@ -777,6 +784,10 @@ public class ActiveMQMessage implements javax.jms.Message {
       this.individualAck = true;
    }
 
+   public void setClientAcknowledge() {
+      this.clientAck = true;
+   }
+
    public void resetMessageID(final String newMsgID) {
       this.msgID = newMsgID;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
index 3d7fa56..4664bb9 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
@@ -237,6 +237,9 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
             // https://issues.jboss.org/browse/JBPAPP-6110
             if (session.getAcknowledgeMode() == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE)
{
                jmsMsg.setIndividualAcknowledge();
+            } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
+               jmsMsg.setClientAcknowledge();
+               coreMessage.acknowledge();
             } else {
                coreMessage.acknowledge();
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
index 92ae226..5d9f6ed 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
@@ -41,6 +41,8 @@ public class JMSMessageListenerWrapper implements MessageHandler {
 
    private final boolean individualACK;
 
+   private final boolean clientACK;
+
    protected JMSMessageListenerWrapper(final ConnectionFactoryOptions options,
                                        final ActiveMQConnection connection,
                                        final ActiveMQSession session,
@@ -60,6 +62,8 @@ public class JMSMessageListenerWrapper implements MessageHandler {
       transactedOrClientAck = (ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE)
|| session.isXA();
 
       individualACK = (ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
+
+      clientACK = (ackMode == Session.CLIENT_ACKNOWLEDGE);
    }
 
    /**
@@ -74,11 +78,14 @@ public class JMSMessageListenerWrapper implements MessageHandler {
          msg.setIndividualAcknowledge();
       }
 
+      if (clientACK) {
+         msg.setClientAcknowledge();
+      }
+
       try {
          msg.doBeforeReceive();
       } catch (Exception e) {
          ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(msg.getCoreMessage().toString(),
e);
-
          return;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
index d242da8..5cefbd0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
@@ -179,9 +179,10 @@ public class JmsConsumerTest extends JMSTestBase {
       }
 
       SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
+      conn.close();
+
       Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
       Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
-      conn.close();
    }
 
    @Test
@@ -225,9 +226,10 @@ public class JmsConsumerTest extends JMSTestBase {
       }
 
       SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
+      context.close();
+
       Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
       Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
-      context.close();
    }
 
    @Test
@@ -299,9 +301,10 @@ public class JmsConsumerTest extends JMSTestBase {
       }
 
       SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
+      conn.close();
+
       Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
       Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
-      conn.close();
    }
 
    @Test

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
index 17927b1..a9ede4b 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
@@ -19,6 +19,8 @@ package org.apache.activemq.artemis.jms.tests;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -31,6 +33,9 @@ import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.jms.JMSFactoryType;
+import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
 import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
 import org.junit.Assert;
 import org.junit.Test;
@@ -1297,4 +1302,68 @@ public class AcknowledgementTest extends JMSTestCase {
 
       checkEmpty(queue1);
    }
+
+   /**
+    * Ensure no blocking calls in acknowledge flow when block on acknowledge = false.
+    * This is done by checking the performance compared to blocking is much improved.
+    */
+   @Test
+   public void testNonBlockingAckPerf() throws Exception {
+      getJmsServerManager().createConnectionFactory("testsuitecf1", false, JMSFactoryType.CF,
NETTY_CONNECTOR, null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL,
ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, true,
true, true, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS,
ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, Ac
 tiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
null, "/testsuitecf1");
+      getJmsServerManager().createConnectionFactory("testsuitecf2", false, JMSFactoryType.CF,
NETTY_CONNECTOR, null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL,
ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, true,
true, true, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS,
ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, Ac
 tiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
null, "/testsuitecf2");
+
+      ActiveMQJMSConnectionFactory cf1 = (ActiveMQJMSConnectionFactory) getInitialContext().lookup("/testsuitecf1");
+      cf1.setBlockOnAcknowledge(false);
+      ActiveMQJMSConnectionFactory cf2 = (ActiveMQJMSConnectionFactory) getInitialContext().lookup("/testsuitecf2");
+      cf2.setBlockOnAcknowledge(true);
+
+      int messageCount = 10000;
+
+      long sendT1 = send(cf1, queue1, messageCount);
+      long sendT2 = send(cf2, queue2, messageCount);
+
+      long time1 = consume(cf1, queue1, messageCount);
+      long time2 = consume(cf2, queue2, messageCount);
+
+      log.info("BlockOnAcknowledge=false MessageCount=" + messageCount + " TimeToConsume="
+ time1);
+      log.info("BlockOnAcknowledge=true MessageCount=" + messageCount + " TimeToConsume="
+ time2);
+
+      Assert.assertTrue(time1 < (time2 / 2));
+
+   }
+
+   private long send(ConnectionFactory connectionFactory, Destination destination, int messageCount)
throws JMSException {
+      try (Connection connection = connectionFactory.createConnection()) {
+         connection.start();
+         try (Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE))
{
+            MessageProducer producer = session.createProducer(destination);
+            Message m = session.createTextMessage("testing123");
+            long start = System.nanoTime();
+            for (int i = 0; i < messageCount; i++) {
+               producer.send(m);
+            }
+            session.commit();
+            long end = System.nanoTime();
+            return end - start;
+         }
+      }
+   }
+
+   private long consume(ConnectionFactory connectionFactory, Destination destination, int
messageCount) throws JMSException {
+      try (Connection connection = connectionFactory.createConnection()) {
+         connection.start();
+         try (Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE))
{
+            MessageConsumer consumer = session.createConsumer(destination);
+            long start = System.nanoTime();
+            for (int i = 0; i < messageCount; i++) {
+               Message message = consumer.receive(100);
+               if (message != null) {
+                  message.acknowledge();
+               }
+            }
+            long end = System.nanoTime();
+            return end - start;
+         }
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
index efea045..39ea0e3 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
@@ -1276,6 +1276,10 @@ public class MessageHeaderTest extends MessageHeaderTestBase {
       }
 
       @Override
+      public void commit(boolean block) throws ActiveMQException {
+      }
+
+      @Override
       public boolean isRollbackOnly() {
 
          return false;


Mime
View raw message