Repository: activemq-artemis
Updated Branches:
refs/heads/master 2eac1959d -> 4ef6e3281
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPageCountSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPageCountSizeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPageCountSizeTest.java
new file mode 100644
index 0000000..10620e5
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPageCountSizeTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.persistence.metrics;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JournalPageCountSizeTest extends ActiveMQTestBase {
+
+ private ActiveMQServer server;
+
+ @Before
+ public void init() throws Exception {
+ server = createServer(true);
+
+ server.start();
+ }
+
+ @Override
+ protected ConfigurationImpl createBasicConfig(int serverID) {
+ return super.createBasicConfig(serverID);
+ }
+
+ @After
+ public void destroy() throws Exception {
+ server.stop();
+ }
+
+ @Test
+ public void testPageCountRecordSize() throws Exception {
+
+ long tx = server.getStorageManager().generateID();
+ server.getStorageManager().storePageCounter(tx, 1, 1, 100);
+ server.getStorageManager().commit(tx);
+ server.getStorageManager().stop();
+
+ JournalStorageManager journalStorageManager = (JournalStorageManager) server.getStorageManager();
+ List<RecordInfo> committedRecords = new LinkedList<>();
+ List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();
+
+ try {
+ journalStorageManager.getMessageJournal().start();
+ journalStorageManager.getMessageJournal().load(committedRecords, preparedTransactions,
transactionFailure);
+
+ ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(committedRecords.get(0).data);
+ PageCountRecord encoding = new PageCountRecord();
+ encoding.decode(buff);
+
+ Assert.assertEquals(100, encoding.getPersistentSize());
+ } finally {
+ journalStorageManager.getMessageJournal().stop();
+ }
+
+ }
+
+ @Test
+ public void testPageCursorCounterRecordSize() throws Exception {
+
+ server.getStorageManager().storePageCounterInc(1, 1, 1000);
+ server.getStorageManager().stop();
+
+ JournalStorageManager journalStorageManager = (JournalStorageManager) server.getStorageManager();
+ List<RecordInfo> committedRecords = new LinkedList<>();
+ List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();
+
+ try {
+ journalStorageManager.getMessageJournal().start();
+ journalStorageManager.getMessageJournal().load(committedRecords, preparedTransactions,
transactionFailure);
+
+ ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(committedRecords.get(0).data);
+ PageCountRecordInc encoding = new PageCountRecordInc();
+ encoding.decode(buff);
+
+ Assert.assertEquals(1000, encoding.getPersistentSize());
+ } finally {
+ journalStorageManager.getMessageJournal().stop();
+ }
+
+ }
+
+ @Test
+ public void testPageCursorCounterRecordSizeTX() throws Exception {
+
+ long tx = server.getStorageManager().generateID();
+ server.getStorageManager().storePageCounterInc(tx, 1, 1, 1000);
+ server.getStorageManager().commit(tx);
+ server.getStorageManager().stop();
+
+ JournalStorageManager journalStorageManager = (JournalStorageManager) server.getStorageManager();
+ List<RecordInfo> committedRecords = new LinkedList<>();
+ List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();
+
+ try {
+ journalStorageManager.getMessageJournal().start();
+ journalStorageManager.getMessageJournal().load(committedRecords, preparedTransactions,
transactionFailure);
+
+ ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(committedRecords.get(0).data);
+ PageCountRecordInc encoding = new PageCountRecordInc();
+ encoding.decode(buff);
+
+ Assert.assertEquals(1000, encoding.getPersistentSize());
+ } finally {
+ journalStorageManager.getMessageJournal().stop();
+ }
+
+ }
+
+ private TransactionFailureCallback transactionFailure = new TransactionFailureCallback()
{
+ @Override
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo>
recordsToDelete) {
+
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java
new file mode 100644
index 0000000..7bad309
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java
@@ -0,0 +1,651 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.persistence.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.ToLongFunction;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.BindingType;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
+import org.apache.activemq.artemis.junit.Wait.Condition;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.commons.lang.StringUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JournalPendingMessageTest extends AbstractPersistentStatTestSupport {
+ protected static final Logger LOG = LoggerFactory.getLogger(JournalPendingMessageTest.class);
+
+ // protected URI brokerConnectURI;
+ protected String defaultQueueName = "test.queue";
+ protected String defaultTopicName = "test.topic";
+ protected static int maxMessageSize = 1000;
+
+ @Before
+ public void setupAddresses() throws Exception {
+ server.getPostOffice()
+ .addAddressInfo(new AddressInfo(SimpleString.toSimpleString(defaultQueueName),
RoutingType.ANYCAST));
+
+ server.createQueue(SimpleString.toSimpleString(defaultQueueName), RoutingType.ANYCAST,
+ SimpleString.toSimpleString(defaultQueueName), null, true, false);
+
+ }
+
+ @Override
+ protected Configuration createDefaultConfig(boolean netty) throws Exception {
+ Configuration config = super.createDefaultConfig(netty);
+
+ // Set a low max size so we page which will test the paging metrics as
+ // well
+ config.setGlobalMaxSize(100000);
+
+ return config;
+ }
+
+ @Test
+ public void testQueueMessageSize() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ publishTestQueueMessages(200, publishedMessageSize);
+
+ verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
+
+ this.killServer();
+ this.restartServer();
+
+ verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
+ }
+
+ @Test
+ public void testQueueMessageSizeTx() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ publishTestQueueMessagesTx(200, publishedMessageSize);
+
+ verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
+
+ this.killServer();
+ this.restartServer();
+
+ verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
+ }
+
+ @Test
+ public void testQueueLargeMessageSize() throws Exception {
+
+ ActiveMQConnectionFactory acf = (ActiveMQConnectionFactory) cf;
+ acf.setMinLargeMessageSize(1000);
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String testText = StringUtils.repeat("t", 5000);
+ ActiveMQTextMessage message = (ActiveMQTextMessage) session.createTextMessage(testText);
+ session.createProducer(session.createQueue(defaultQueueName)).send(message);
+
+ verifyPendingStats(defaultQueueName, 1, message.getCoreMessage().getPersistentSize());
+ verifyPendingDurableStats(defaultQueueName, 1, message.getCoreMessage().getPersistentSize());
+
+ connection.close();
+
+ this.killServer();
+ this.restartServer();
+
+ verifyPendingStats(defaultQueueName, 1, message.getCoreMessage().getPersistentSize());
+ verifyPendingDurableStats(defaultQueueName, 1, message.getCoreMessage().getPersistentSize());
+
+ }
+
+ @Test
+ public void testQueueLargeMessageSizeTX() throws Exception {
+
+ ActiveMQConnectionFactory acf = (ActiveMQConnectionFactory) cf;
+ acf.setMinLargeMessageSize(1000);
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ String testText = StringUtils.repeat("t", 2000);
+ MessageProducer producer = session.createProducer(session.createQueue(defaultQueueName));
+ ActiveMQTextMessage message = (ActiveMQTextMessage) session.createTextMessage(testText);
+ for (int i = 0; i < 10; i++) {
+ producer.send(message);
+ }
+
+ //not commited so should be 0
+ verifyPendingStats(defaultQueueName, 0, message.getCoreMessage().getPersistentSize()
* 10);
+ verifyPendingDurableStats(defaultQueueName, 0, message.getCoreMessage().getPersistentSize()
* 10);
+
+ session.commit();
+
+ verifyPendingStats(defaultQueueName, 10, message.getCoreMessage().getPersistentSize()
* 10);
+ verifyPendingDurableStats(defaultQueueName, 10, message.getCoreMessage().getPersistentSize()
* 10);
+
+ connection.close();
+
+ this.killServer();
+ this.restartServer();
+
+ verifyPendingStats(defaultQueueName, 10, message.getCoreMessage().getPersistentSize());
+ verifyPendingDurableStats(defaultQueueName, 10, message.getCoreMessage().getPersistentSize());
+ }
+
+ @Test
+ public void testQueueBrowserMessageSize() throws Exception {
+
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ publishTestQueueMessages(200, publishedMessageSize);
+ browseTestQueueMessages(defaultQueueName);
+ verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
+ }
+
+ @Test
+ public void testQueueMessageSizeNonPersistent() throws Exception {
+
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+ verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultQueueName, 0, 0);
+ }
+
+ @Test
+ public void testQueueMessageSizePersistentAndNonPersistent() throws Exception {
+
+ AtomicLong publishedNonPersistentMessageSize = new AtomicLong();
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ publishTestQueueMessages(100, DeliveryMode.PERSISTENT, publishedMessageSize);
+ publishTestQueueMessages(100, DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize);
+ verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get() + publishedNonPersistentMessageSize.get());
+ verifyPendingDurableStats(defaultQueueName, 100, publishedMessageSize.get());
+ }
+
+ @Test
+ public void testQueueMessageSizeAfterConsumption() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ publishTestQueueMessages(200, publishedMessageSize);
+ verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
+
+ consumeTestQueueMessages(200);
+
+ verifyPendingStats(defaultQueueName, 0, 0);
+ verifyPendingDurableStats(defaultQueueName, 0, 0);
+ }
+
+ @Test
+ public void testScheduledStats() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(session.createQueue(defaultQueueName));
+ producer.setDeliveryDelay(2000);
+ producer.send(session.createTextMessage("test"));
+
+ verifyPendingStats(defaultQueueName, 1, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultQueueName, 1, publishedMessageSize.get());
+ verifyScheduledStats(defaultQueueName, 1, publishedMessageSize.get());
+
+ consumeTestQueueMessages(1);
+
+ verifyPendingStats(defaultQueueName, 0, 0);
+ verifyPendingDurableStats(defaultQueueName, 0, 0);
+ verifyScheduledStats(defaultQueueName, 0, 0);
+
+ connection.close();
+ }
+
+
+ @Test
+ public void testDeliveringStats() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(session.createQueue(defaultQueueName));
+ producer.send(session.createTextMessage("test"));
+
+ verifyPendingStats(defaultQueueName, 1, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultQueueName, 1, publishedMessageSize.get());
+ verifyDeliveringStats(defaultQueueName, 0, 0);
+
+ MessageConsumer consumer = session.createConsumer(session.createQueue(defaultQueueName));
+ Message msg = consumer.receive();
+ verifyDeliveringStats(defaultQueueName, 1, publishedMessageSize.get());
+ msg.acknowledge();
+
+ verifyPendingStats(defaultQueueName, 0, 0);
+ verifyPendingDurableStats(defaultQueueName, 0, 0);
+ verifyDeliveringStats(defaultQueueName, 0, 0);
+
+ connection.close();
+ }
+ @Test
+ public void testQueueMessageSizeAfterConsumptionNonPersistent() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+ verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+
+ consumeTestQueueMessages(200);
+
+ verifyPendingStats(defaultQueueName, 0, 0);
+ verifyPendingDurableStats(defaultQueueName, 0, 0);
+ }
+
+ @Test
+ public void testTopicMessageSize() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = cf.createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(session.createTopic(defaultTopicName));
+
+ publishTestTopicMessages(200, publishedMessageSize);
+
+ verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultQueueName, 0, 0);
+
+ // consume all messages
+ consumeTestMessages(consumer, 200);
+
+ // All messages should now be gone
+ verifyPendingStats(defaultTopicName, 0, 0);
+ verifyPendingDurableStats(defaultQueueName, 0, 0);
+
+ connection.close();
+ }
+
+ @Test
+ public void testTopicMessageSizeShared() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = cf.createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createSharedConsumer(session.createTopic(defaultTopicName),
"sub1");
+ MessageConsumer consumer2 = session.createSharedConsumer(session.createTopic(defaultTopicName),
"sub1");
+
+ publishTestTopicMessages(200, publishedMessageSize);
+
+ verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultTopicName, 0, 0);
+ consumer2.close();
+
+ // consume all messages
+ consumeTestMessages(consumer, 200);
+
+ // All messages should now be gone
+ verifyPendingStats(defaultTopicName, 0, 0);
+ verifyPendingDurableStats(defaultTopicName, 0, 0);
+
+ connection.close();
+ }
+
+ @Test
+ public void testTopicNonPersistentMessageSize() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = cf.createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(session.createTopic(defaultTopicName));
+
+ publishTestTopicMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+
+ verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+
+ // consume all messages
+ consumeTestMessages(consumer, 200);
+
+ // All messages should now be gone
+ verifyPendingStats(defaultTopicName, 0, 0);
+
+ connection.close();
+ }
+
+ @Test
+ public void testTopicPersistentAndNonPersistentMessageSize() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+ AtomicLong publishedNonPersistentMessageSize = new AtomicLong();
+
+ Connection connection = cf.createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(session.createTopic(defaultTopicName));
+
+ publishTestTopicMessages(100, DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize);
+ publishTestTopicMessages(100, DeliveryMode.PERSISTENT, publishedMessageSize);
+
+ verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get() + publishedNonPersistentMessageSize.get());
+
+ // consume all messages
+ consumeTestMessages(consumer, 200);
+
+ // All messages should now be gone
+ verifyPendingStats(defaultTopicName, 0, 0);
+
+ connection.close();
+ }
+
+ @Test
+ public void testMessageSizeOneDurable() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+ Connection connection = cf.createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, publishedMessageSize,
+ DeliveryMode.PERSISTENT, false);
+
+ // verify the count and size - durable is offline so all 200 should be
+ // pending since none are in prefetch
+ verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
+
+ // consume all messages
+ consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+
+ // All messages should now be gone
+ verifyPendingStats(defaultTopicName, 0, 0);
+ verifyPendingDurableStats(defaultTopicName, 0, 0);
+
+ connection.close();
+ }
+
+ @Test
+ public void testMessageSizeOneDurablePartialConsumption() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = cf.createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, publishedMessageSize,
+ DeliveryMode.PERSISTENT, false);
+
+ verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
+
+ // consume partial messages
+ consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
+
+ // 150 should be left
+ verifyPendingStats(defaultTopicName, 150, publishedMessageSize.get());
+ // We don't really know the size here but it should be smaller than before
+ // so take an average
+ verifyPendingDurableStats(defaultTopicName, 150, (long) (.75 * publishedMessageSize.get()));
+
+ connection.close();
+ }
+
+ @Test
+ public void testMessageSizeTwoDurables() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = cf.createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, publishedMessageSize,
+ DeliveryMode.PERSISTENT, false);
+
+ // verify the count and size - double because two durables so two queue
+ // bindings
+ verifyPendingStats(defaultTopicName, 400, 2 * publishedMessageSize.get());
+ verifyPendingDurableStats(defaultTopicName, 400, 2 * publishedMessageSize.get());
+
+ // consume messages just for sub1
+ consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+
+ // There is still a durable that hasn't consumed so the messages should
+ // exist
+ verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
+
+ connection.close();
+
+ // restart and verify load
+ this.killServer();
+ this.restartServer();
+ verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
+ }
+
+ @Test
+ public void testMessageSizeSharedDurable() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = cf.createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ // The publish method will create a second shared consumer
+ Session s = connection.createSession();
+ MessageConsumer c = s.createSharedDurableConsumer(s.createTopic(defaultTopicName),
"sub1");
+ publishTestMessagesDurable(connection, new String[] {"sub1",}, 200, publishedMessageSize,
+ DeliveryMode.PERSISTENT, true);
+
+ // verify the count and size - double because two durables so two queue
+ // bindings
+ verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
+ c.close();
+
+ // consume messages for sub1
+ consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+ verifyPendingStats(defaultTopicName, 0, publishedMessageSize.get());
+ verifyPendingDurableStats(defaultTopicName, 0, publishedMessageSize.get());
+
+ connection.close();
+ }
+
+ protected List<Queue> getQueues(final String address) throws Exception {
+ final List<Queue> queues = new ArrayList<>();
+ for (Binding binding : server.getPostOffice().getDirectBindings(SimpleString.toSimpleString(address))
+ .getBindings()) {
+ if (binding.getType() == BindingType.LOCAL_QUEUE) {
+ LocalQueueBinding queueBinding = (LocalQueueBinding) binding;
+ queues.add(queueBinding.getQueue());
+ }
+ }
+ return queues;
+ }
+
+ protected void verifyDeliveringStats(final String address, final int count, final long
minimumSize) throws Exception {
+ verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getDeliveringCount,
+ org.apache.activemq.artemis.core.server.Queue::getDeliveringSize);
+ verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getDurableDeliveringCount,
+ org.apache.activemq.artemis.core.server.Queue::getDurableDeliveringSize);
+ }
+
+
+ protected void verifyScheduledStats(final String address, final int count, final long
minimumSize) throws Exception {
+ verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getScheduledCount,
+ org.apache.activemq.artemis.core.server.Queue::getScheduledSize);
+ verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getDurableScheduledCount,
+ org.apache.activemq.artemis.core.server.Queue::getDurableScheduledSize);
+ }
+
+ protected void verifyPendingStats(final String address, final int count, final long minimumSize)
throws Exception {
+ verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getMessageCount,
+ org.apache.activemq.artemis.core.server.Queue::getPersistentSize);
+ }
+
+ protected void verifyPendingDurableStats(final String address, final int count, final
long minimumSize)
+ throws Exception {
+
+ verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getDurableMessageCount,
+ org.apache.activemq.artemis.core.server.Queue::getDurablePersistentSize);
+ }
+
+ protected void verifyStats(final String address, final int count, final long minimumSize,
+ ToLongFunction<Queue> countFunc, ToLongFunction<Queue> sizeFunc)
+ throws Exception {
+ final List<Queue> queues = getQueues(address);
+
+ assertTrue(Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisfied() throws Exception {
+ return queues.stream().mapToLong(countFunc)
+ .sum() == count;
+ }
+ }));
+
+ verifySize(count, new MessageSizeCalculator() {
+ @Override
+ public long getMessageSize() throws Exception {
+ return queues.stream().mapToLong(sizeFunc)
+ .sum();
+ }
+ }, minimumSize);
+
+ }
+
+ protected void verifySize(final int count, final MessageSizeCalculator messageSizeCalculator,
final long minimumSize)
+ throws Exception {
+ if (count > 0) {
+ assertTrue(Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisfied() throws Exception {
+ return messageSizeCalculator.getMessageSize() > minimumSize;
+ }
+ }));
+ } else {
+ assertTrue(Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisfied() throws Exception {
+ return messageSizeCalculator.getMessageSize() == 0;
+ }
+ }));
+ }
+ }
+
+ protected interface MessageSizeCalculator {
+ long getMessageSize() throws Exception;
+ }
+
+ protected void consumeTestMessages(MessageConsumer consumer, int size) throws Exception
{
+ consumeTestMessages(consumer, size, defaultTopicName);
+ }
+
+ protected void consumeTestMessages(MessageConsumer consumer, int size, String topicName)
throws Exception {
+ for (int i = 0; i < size; i++) {
+ consumer.receive();
+ }
+ }
+
+ protected void consumeDurableTestMessages(Connection connection, String sub, int size,
+ AtomicLong publishedMessageSize) throws Exception {
+ consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize);
+ }
+
+ protected void publishTestMessagesDurable(Connection connection, String[] subNames, int
publishSize,
+ AtomicLong publishedMessageSize, int deliveryMode, boolean shared) throws Exception
{
+
+ publishTestMessagesDurable(connection, subNames, defaultTopicName, publishSize, 0,
+ AbstractPersistentStatTestSupport.defaultMessageSize, publishedMessageSize, false,
deliveryMode, shared);
+ }
+
+ protected void publishTestTopicMessages(int publishSize, AtomicLong publishedMessageSize)
throws Exception {
+ publishTestTopicMessages(publishSize, DeliveryMode.PERSISTENT, publishedMessageSize);
+ }
+
+ protected void publishTestTopicMessages(int publishSize, int deliveryMode, AtomicLong
publishedMessageSize)
+ throws Exception {
+ // create a new queue
+ Connection connection = cf.createConnection();
+ connection.setClientID("clientId2");
+ connection.start();
+
+ // Start the connection
+ Session session = connection.createSession(false, TopicSession.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(defaultTopicName);
+
+ try {
+ // publish a bunch of non-persistent messages to fill up the temp
+ // store
+ MessageProducer prod = session.createProducer(topic);
+ prod.setDeliveryMode(deliveryMode);
+ for (int i = 0; i < publishSize; i++) {
+ prod.send(createMessage(i, session, JournalPendingMessageTest.maxMessageSize,
publishedMessageSize));
+ }
+
+ } finally {
+ connection.close();
+ }
+ }
+
+ protected void publishTestQueueMessagesTx(int count, AtomicLong publishedMessageSize)
throws Exception {
+ publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT,
+ JournalPendingMessageTest.maxMessageSize, publishedMessageSize, true);
+ }
+
+ protected void publishTestQueueMessages(int count, AtomicLong publishedMessageSize) throws
Exception {
+ publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT,
+ JournalPendingMessageTest.maxMessageSize, publishedMessageSize, false);
+ }
+
+ protected void publishTestQueueMessages(int count, int deliveryMode, AtomicLong publishedMessageSize)
+ throws Exception {
+ publishTestQueueMessages(count, defaultQueueName, deliveryMode, JournalPendingMessageTest.maxMessageSize,
+ publishedMessageSize, false);
+ }
+
+ protected void consumeTestQueueMessages(int num) throws Exception {
+ consumeTestQueueMessages(defaultQueueName, num);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 6e66057..25300d3c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -362,6 +362,21 @@ public class FakeQueue extends CriticalComponentImpl implements Queue
{
return messageCount;
}
+ @Override
+ public long getPersistentSize() {
+ return 0;
+ }
+
+ @Override
+ public long getDurableMessageCount() {
+ return 0;
+ }
+
+ @Override
+ public long getDurablePersistentSize() {
+ return 0;
+ }
+
public void setMessageCount(long messageCount) {
this.messageCount = messageCount;
}
@@ -453,6 +468,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue
{
}
@Override
+ public long getScheduledSize() {
+ // no-op
+ return 0;
+ }
+
+ @Override
public List<MessageReference> getScheduledMessages() {
// no-op
return null;
@@ -522,7 +543,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue
{
}
@Override
- public void referenceHandled() {
+ public void referenceHandled(MessageReference ref) {
// no-op
}
@@ -684,6 +705,28 @@ public class FakeQueue extends CriticalComponentImpl implements Queue
{
}
@Override
- public void decDelivering(int size) {
+ public long getDeliveringSize() {
+ return 0;
}
+
+ @Override
+ public int getDurableDeliveringCount() {
+ return 0;
+ }
+
+ @Override
+ public long getDurableDeliveringSize() {
+ return 0;
+ }
+
+ @Override
+ public int getDurableScheduledCount() {
+ return 0;
+ }
+
+ @Override
+ public long getDurableScheduledSize() {
+ return 0;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index 1db8347..2a5a330 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -105,7 +105,7 @@ public class FakeConsumer implements Consumer {
if (filter != null) {
if (filter.match(reference.getMessage())) {
references.addLast(reference);
- reference.getQueue().referenceHandled();
+ reference.getQueue().referenceHandled(reference);
notify();
return HandleStatus.HANDLED;
@@ -125,7 +125,7 @@ public class FakeConsumer implements Consumer {
}
if (statusToReturn == HandleStatus.HANDLED) {
- reference.getQueue().referenceHandled();
+ reference.getQueue().referenceHandled(reference);
references.addLast(reference);
notify();
}
|