From commits-return-50465-archive-asf-public=cust-asf.ponee.io@activemq.apache.org Wed Feb 7 16:27:41 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 643A918065B for ; Wed, 7 Feb 2018 16:27:41 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5405A160C3C; Wed, 7 Feb 2018 15:27:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 786C6160C5C for ; Wed, 7 Feb 2018 16:27:39 +0100 (CET) Received: (qmail 31772 invoked by uid 500); 7 Feb 2018 15:27:38 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 31756 invoked by uid 99); 7 Feb 2018 15:27:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Feb 2018 15:27:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F3169E08A1; Wed, 7 Feb 2018 15:27:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: michaelpearce@apache.org To: commits@activemq.apache.org Date: Wed, 07 Feb 2018 15:27:39 -0000 Message-Id: <3f9e1f603f73421ca3aa2ce0b6afb22e@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/5] activemq-artemis git commit: ARTEMIS-853 Support for exclusive consumers http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index a3e817b..55125bd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -559,7 +559,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable) throws Exception { AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString()); - return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), false); + return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), false); } public Queue createQueue(final AddressInfo addressInfo, @@ -569,6 +569,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final boolean durable, final int maxConsumers, final boolean purgeOnNoConsumers, + final boolean exclusive, + final boolean lastValue, final boolean autoCreated) throws Exception { final SimpleString unPrefixedName = removePrefix(name); @@ -583,7 +585,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { server.checkQueueCreationLimit(getUsername()); - Queue queue = server.createQueue(art, unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, server.getAddressSettingsRepository().getMatch(art.getName().toString()).isAutoCreateAddresses()); + Queue queue = server.createQueue(art, unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, server.getAddressSettingsRepository().getMatch(art.getName().toString()).isAutoCreateAddresses()); if (temporary) { // Temporary queue in core simply means the queue will be deleted if @@ -622,7 +624,30 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final int maxConsumers, final boolean purgeOnNoConsumers, final boolean autoCreated) throws Exception { - return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, autoCreated); + AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString()); + return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), autoCreated); + } + + @Override + public Queue createQueue(final SimpleString address, + final SimpleString name, + final RoutingType routingType, + final SimpleString filterString, + final boolean temporary, + final boolean durable, + final int maxConsumers, + final boolean purgeOnNoConsumers, + final Boolean exclusive, + final Boolean lastValue, + final boolean autoCreated) throws Exception { + if (exclusive == null || lastValue == null) { + AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString()); + return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, + exclusive == null ? as.isDefaultExclusiveQueue() : exclusive, lastValue == null ? as.isDefaultLastValueQueue() : lastValue, autoCreated); + } else { + return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, + exclusive, lastValue, autoCreated); + } } @Override @@ -640,7 +665,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable, boolean autoCreated) throws Exception { AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString()); - return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), autoCreated); + return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), autoCreated); + } + + @Override + public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable, Boolean exclusive, Boolean lastValue, boolean autoCreated) throws Exception { + AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString()); + return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), + exclusive == null ? as.isDefaultExclusiveQueue() : exclusive, lastValue == null ? as.isDefaultLastValueQueue() : lastValue, autoCreated); } @Override @@ -670,18 +702,37 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public void createSharedQueue(SimpleString address, - final SimpleString name, - final RoutingType routingType, + SimpleString name, + RoutingType routingType, + SimpleString filterString, boolean durable, - final SimpleString filterString) throws Exception { - + Integer maxConsumers, + Boolean purgeOnNoConsumers, + Boolean exclusive, + Boolean lastValue) throws Exception { address = removePrefix(address); - securityCheck(address, name, CheckType.CREATE_NON_DURABLE_QUEUE, this); + securityCheck(address, name, durable ? CheckType.CREATE_DURABLE_QUEUE : CheckType.CREATE_NON_DURABLE_QUEUE, this); server.checkQueueCreationLimit(getUsername()); - server.createSharedQueue(address, routingType, name, filterString, SimpleString.toSimpleString(getUsername()), durable); + AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString()); + + server.createSharedQueue(address, routingType, name, filterString, SimpleString.toSimpleString(getUsername()), durable, + maxConsumers == null ? as.getDefaultMaxConsumers() : maxConsumers, + purgeOnNoConsumers == null ? as.isDefaultPurgeOnNoConsumers() : purgeOnNoConsumers, + exclusive == null ? as.isDefaultExclusiveQueue() : exclusive, + lastValue == null ? as.isDefaultLastValueQueue() : lastValue); + } + + @Override + public void createSharedQueue(SimpleString address, + final SimpleString name, + final RoutingType routingType, + boolean durable, + final SimpleString filterString) throws Exception { + + createSharedQueue(address, name, routingType, filterString, durable, null, null, null, null); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index ca9c550..c7021fd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -124,7 +124,9 @@ public class AddressSettings implements Mergeable, Serializable private Long expiryDelay = AddressSettings.DEFAULT_EXPIRY_DELAY; - private Boolean lastValueQueue = null; + private Boolean defaultLastValueQueue = null; + + private Boolean defaultExclusiveQueue = null; private Long redistributionDelay = null; @@ -190,7 +192,8 @@ public class AddressSettings implements Mergeable, Serializable this.deadLetterAddress = other.deadLetterAddress; this.expiryAddress = other.expiryAddress; this.expiryDelay = other.expiryDelay; - this.lastValueQueue = other.lastValueQueue; + this.defaultLastValueQueue = other.defaultLastValueQueue; + this.defaultExclusiveQueue = other.defaultExclusiveQueue; this.redistributionDelay = other.redistributionDelay; this.sendToDLAOnNoRoute = other.sendToDLAOnNoRoute; this.slowConsumerThreshold = other.slowConsumerThreshold; @@ -352,12 +355,21 @@ public class AddressSettings implements Mergeable, Serializable return this; } - public boolean isLastValueQueue() { - return lastValueQueue != null ? lastValueQueue : AddressSettings.DEFAULT_LAST_VALUE_QUEUE; + public boolean isDefaultLastValueQueue() { + return defaultLastValueQueue != null ? defaultLastValueQueue : AddressSettings.DEFAULT_LAST_VALUE_QUEUE; } - public AddressSettings setLastValueQueue(final boolean lastValueQueue) { - this.lastValueQueue = lastValueQueue; + public AddressSettings setDefaultLastValueQueue(final boolean defaultLastValueQueue) { + this.defaultLastValueQueue = defaultLastValueQueue; + return this; + } + + public boolean isDefaultExclusiveQueue() { + return defaultExclusiveQueue != null ? defaultExclusiveQueue : ActiveMQDefaultConfiguration.getDefaultExclusive(); + } + + public AddressSettings setDefaultExclusiveQueue(Boolean defaultExclusiveQueue) { + this.defaultExclusiveQueue = defaultExclusiveQueue; return this; } @@ -691,7 +703,7 @@ public class AddressSettings implements Mergeable, Serializable expiryDelay = BufferHelper.readNullableLong(buffer); - lastValueQueue = BufferHelper.readNullableBoolean(buffer); + defaultLastValueQueue = BufferHelper.readNullableBoolean(buffer); redistributionDelay = BufferHelper.readNullableLong(buffer); @@ -751,6 +763,10 @@ public class AddressSettings implements Mergeable, Serializable defaultQueueRoutingType = RoutingType.getType(buffer.readByte()); defaultAddressRoutingType = RoutingType.getType(buffer.readByte()); + + if (buffer.readableBytes() > 0) { + defaultExclusiveQueue = BufferHelper.readNullableBoolean(buffer); + } } @Override @@ -769,7 +785,7 @@ public class AddressSettings implements Mergeable, Serializable SimpleString.sizeofNullableString(deadLetterAddress) + SimpleString.sizeofNullableString(expiryAddress) + BufferHelper.sizeOfNullableLong(expiryDelay) + - BufferHelper.sizeOfNullableBoolean(lastValueQueue) + + BufferHelper.sizeOfNullableBoolean(defaultLastValueQueue) + BufferHelper.sizeOfNullableLong(redistributionDelay) + BufferHelper.sizeOfNullableBoolean(sendToDLAOnNoRoute) + BufferHelper.sizeOfNullableLong(slowConsumerCheckPeriod) + @@ -788,7 +804,8 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.sizeOfNullableInteger(defaultMaxConsumers) + BufferHelper.sizeOfNullableBoolean(defaultPurgeOnNoConsumers) + DataConstants.SIZE_BYTE + - DataConstants.SIZE_BYTE; + DataConstants.SIZE_BYTE + + BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue); } @Override @@ -819,7 +836,7 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableLong(buffer, expiryDelay); - BufferHelper.writeNullableBoolean(buffer, lastValueQueue); + BufferHelper.writeNullableBoolean(buffer, defaultLastValueQueue); BufferHelper.writeNullableLong(buffer, redistributionDelay); @@ -862,6 +879,9 @@ public class AddressSettings implements Mergeable, Serializable buffer.writeByte(defaultQueueRoutingType == null ? -1 : defaultQueueRoutingType.getType()); buffer.writeByte(defaultAddressRoutingType == null ? -1 : defaultAddressRoutingType.getType()); + + BufferHelper.writeNullableBoolean(buffer, defaultExclusiveQueue); + } /* (non-Javadoc) @@ -876,7 +896,8 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((dropMessagesWhenFull == null) ? 0 : dropMessagesWhenFull.hashCode()); result = prime * result + ((expiryAddress == null) ? 0 : expiryAddress.hashCode()); result = prime * result + ((expiryDelay == null) ? 0 : expiryDelay.hashCode()); - result = prime * result + ((lastValueQueue == null) ? 0 : lastValueQueue.hashCode()); + result = prime * result + ((defaultLastValueQueue == null) ? 0 : defaultLastValueQueue.hashCode()); + result = prime * result + ((defaultExclusiveQueue == null) ? 0 : defaultExclusiveQueue.hashCode()); result = prime * result + ((maxDeliveryAttempts == null) ? 0 : maxDeliveryAttempts.hashCode()); result = prime * result + ((maxSizeBytes == null) ? 0 : maxSizeBytes.hashCode()); result = prime * result + ((messageCounterHistoryDayLimit == null) ? 0 : messageCounterHistoryDayLimit.hashCode()); @@ -947,10 +968,15 @@ public class AddressSettings implements Mergeable, Serializable return false; } else if (!expiryDelay.equals(other.expiryDelay)) return false; - if (lastValueQueue == null) { - if (other.lastValueQueue != null) + if (defaultLastValueQueue == null) { + if (other.defaultLastValueQueue != null) + return false; + } else if (!defaultLastValueQueue.equals(other.defaultLastValueQueue)) + return false; + if (defaultExclusiveQueue == null) { + if (other.defaultExclusiveQueue != null) return false; - } else if (!lastValueQueue.equals(other.lastValueQueue)) + } else if (!defaultExclusiveQueue.equals(other.defaultExclusiveQueue)) return false; if (maxDeliveryAttempts == null) { if (other.maxDeliveryAttempts != null) @@ -1124,8 +1150,10 @@ public class AddressSettings implements Mergeable, Serializable expiryAddress + ", expiryDelay=" + expiryDelay + - ", lastValueQueue=" + - lastValueQueue + + ", defaultLastValueQueue=" + + defaultLastValueQueue + + ", defaultExclusiveQueue=" + + defaultExclusiveQueue + ", maxDeliveryAttempts=" + maxDeliveryAttempts + ", maxSizeBytes=" + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/artemis-server/src/main/resources/schema/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 58abd67..f67cd48 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -502,6 +502,8 @@ + + @@ -2693,7 +2695,23 @@ - whether to treat the queue as a last value queue + This is deprecated please use default-last-value-queue instead. + + + + + + + + whether to treat the queues under the address as a last value queues by default + + + + + + + + whether to treat the queues under the address as exclusive queues by default @@ -3009,6 +3027,8 @@ + + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 256a670..4fda8b3 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1312,6 +1312,21 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override + public boolean isLastValue() { + return false; + } + + @Override + public boolean isExclusive() { + return false; + } + + @Override + public void setExclusive(boolean exclusive) { + + } + + @Override public void decDelivering(int size) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/artemis-tools/src/test/resources/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd index c0b88f9..cb579a8 100644 --- a/artemis-tools/src/test/resources/artemis-configuration.xsd +++ b/artemis-tools/src/test/resources/artemis-configuration.xsd @@ -484,6 +484,8 @@ + + @@ -2422,7 +2424,23 @@ - whether to treat the queue as a last value queue + This is deprecated please use default-last-value-queue instead. + + + + + + + + whether to treat the queues under the address as a last value queues by default + + + + + + + + whether to treat the queues under the address as exclusive queues by default @@ -2679,6 +2697,7 @@ + @@ -2691,6 +2710,8 @@ + + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java index 0ffa9c5..1c50412 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java @@ -43,7 +43,7 @@ public class JMSLVQTest extends JMSClientTestSupport { @Override protected void addConfiguration(ActiveMQServer server) { - server.getAddressSettingsRepository().addMatch(LVQ_QUEUE_NAME, new AddressSettings().setLastValueQueue(true)); + server.getAddressSettingsRepository().addMatch(LVQ_QUEUE_NAME, new AddressSettings().setDefaultLastValueQueue(true)); } @Override protected void createAddressAndQueues(ActiveMQServer server) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleProducersTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleProducersTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleProducersTest.java index f412426..0c9c566 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleProducersTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleProducersTest.java @@ -74,7 +74,7 @@ public class MultipleProducersTest extends JMSTestBase { addressSettings.setDeadLetterAddress(expiryQueue); addressSettings.setRedeliveryDelay(0); addressSettings.setMessageCounterHistoryDayLimit(2); - addressSettings.setLastValueQueue(false); + addressSettings.setDefaultLastValueQueue(false); addressSettings.setMaxDeliveryAttempts(10); addressSettings.setMaxSizeBytes(1048576); addressSettings.setPageCacheMaxSize(5); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionCreateAndDeleteQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionCreateAndDeleteQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionCreateAndDeleteQueueTest.java index 4d6dde3..af4ab8d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionCreateAndDeleteQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionCreateAndDeleteQueueTest.java @@ -99,7 +99,7 @@ public class SessionCreateAndDeleteQueueTest extends ActiveMQTestBase { @Test public void testAddressSettingUSed() throws Exception { - server.getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings().setLastValueQueue(true)); + server.getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings().setDefaultLastValueQueue(true)); ClientSession session = createSessionFactory(locator).createSession(false, true, true); SimpleString filterString = new SimpleString("x=y"); session.createQueue(address, queueName, filterString, false); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java index 5d1763f..f5b6a00 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java @@ -59,7 +59,7 @@ public class UpdateQueueTest extends ActiveMQTestBase { prod.send(session.createTextMessage("message " + i)); } - server.updateQueue(ADDRESS.toString(), RoutingType.ANYCAST, 1, false); + server.updateQueue(ADDRESS.toString(), RoutingType.ANYCAST, 1, false, false); conn.close(); factory.close(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ExclusiveTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ExclusiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ExclusiveTest.java new file mode 100644 index 0000000..1331615 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ExclusiveTest.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSConsumer; +import javax.jms.JMSContext; +import javax.jms.JMSProducer; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Before; +import org.junit.Test; + +/** + * Exclusive Test + */ +public class ExclusiveTest extends JMSTestBase { + + private SimpleString queueName = SimpleString.toSimpleString("jms.exclusive.queue"); + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, -1, false, true, false,true); + } + + + protected ConnectionFactory getCF() throws Exception { + return cf; + } + + @Test + public void testExclusiveQueueConsumer() throws Exception { + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Destination queue = session.createQueue(queueName.toString()); + MessageProducer producer = session.createProducer(queue); + + MessageConsumer consumer1 = session.createConsumer(queue); + MessageConsumer consumer2 = session.createConsumer(queue); + MessageConsumer consumer3 = session.createConsumer(queue); + + connection.start(); + + for (int j = 0; j < 100; j++) { + TextMessage message = session.createTextMessage(); + + message.setText("Message" + j); + + producer.send(message); + } + + + //All msgs should go to the first consumer + for (int j = 0; j < 100; j++) { + TextMessage tm = (TextMessage) consumer1.receive(10000); + + assertNotNull(tm); + + assertEquals("Message" + j, tm.getText()); + + tm = (TextMessage) consumer2.receiveNoWait(); + assertNull(tm); + tm = (TextMessage) consumer3.receiveNoWait(); + assertNull(tm); + } + } finally { + connection.close(); + } + } + + @Test + public void testExclusiveWithJMS2Producer() throws Exception { + ConnectionFactory fact = getCF(); + JMSContext ctx = addContext(getCF().createContext(JMSContext.SESSION_TRANSACTED)); + + try { + JMSProducer producer = ctx.createProducer(); + Destination queue = ctx.createQueue(queueName.toString()); + + JMSConsumer consumer1 = ctx.createConsumer(queue); + JMSConsumer consumer2 = ctx.createConsumer(queue); + JMSConsumer consumer3 = ctx.createConsumer(queue); + + ctx.start(); + + for (int j = 0; j < 100; j++) { + TextMessage message = ctx.createTextMessage("Message" + j); + + producer.send(queue, message); + } + + ctx.commit(); + + //All msgs should go to the first consumer + for (int j = 0; j < 100; j++) { + TextMessage tm = (TextMessage) consumer1.receive(10000); + + assertNotNull(tm); + + tm.acknowledge(); + + assertEquals("Message" + j, tm.getText()); + + tm = (TextMessage) consumer2.receiveNoWait(); + assertNull(tm); + tm = (TextMessage) consumer3.receiveNoWait(); + assertNull(tm); + } + + ctx.commit(); + } finally { + ctx.close(); + } + } + + @Test + public void testExclusiveQueueConsumerSettingUsingAddressQueueParameters() throws Exception { + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Queue queue = session.createQueue("random?exclusive=true"); + assertEquals("random", queue.getQueueName()); + + ActiveMQDestination a = (ActiveMQDestination) queue; + assertTrue(a.getQueueAttributes().getExclusive()); + + MessageProducer producer = session.createProducer(queue); + + MessageConsumer consumer1 = session.createConsumer(queue); + MessageConsumer consumer2 = session.createConsumer(queue); + MessageConsumer consumer3 = session.createConsumer(queue); + + connection.start(); + + for (int j = 0; j < 100; j++) { + TextMessage message = session.createTextMessage(); + + message.setText("Message" + j); + + producer.send(message); + } + + + //All msgs should go to the first consumer + for (int j = 0; j < 100; j++) { + TextMessage tm = (TextMessage) consumer1.receive(10000); + + assertNotNull(tm); + + assertEquals("Message" + j, tm.getText()); + + tm = (TextMessage) consumer2.receiveNoWait(); + assertNull(tm); + tm = (TextMessage) consumer3.receiveNoWait(); + assertNull(tm); + } + } finally { + connection.close(); + } + } + + @Test + public void testExclusiveQueueConsumerFailover() throws Exception { + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Destination queue = session.createQueue(queueName.toString()); + MessageProducer producer = session.createProducer(queue); + + MessageConsumer consumer1 = session.createConsumer(queue); + MessageConsumer consumer2 = session.createConsumer(queue); + MessageConsumer consumer3 = session.createConsumer(queue); + + connection.start(); + + for (int j = 0; j < 100; j++) { + TextMessage message = session.createTextMessage(); + + message.setText("Message" + j); + + producer.send(message); + } + + + //All msgs should go to the first consumer + for (int j = 0; j < 50; j++) { + TextMessage tm = (TextMessage) consumer1.receive(10000); + + assertNotNull(tm); + + assertEquals("Message" + j, tm.getText()); + + tm = (TextMessage) consumer2.receiveNoWait(); + assertNull(tm); + tm = (TextMessage) consumer3.receiveNoWait(); + assertNull(tm); + } + consumer1.close(); + + //All msgs should now go to the next consumer only, without any errors or exceptions + for (int j = 50; j < 100; j++) { + TextMessage tm = (TextMessage) consumer2.receive(10000); + + assertNotNull(tm); + + assertEquals("Message" + j, tm.getText()); + + tm = (TextMessage) consumer3.receiveNoWait(); + assertNull(tm); + } + } finally { + connection.close(); + } + } + + + @Test + public void testExclusiveTopicSharedConsumerFailover() throws Exception { + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Destination topic = session.createTopic("mytopic"); + MessageProducer producer = session.createProducer(topic); + + String subscriptionName = "sharedsub"; + Topic topicConsumer = session.createTopic("mytopic?exclusive=true"); + MessageConsumer consumer1 = session.createSharedDurableConsumer(topicConsumer, subscriptionName); + MessageConsumer consumer2 = session.createSharedDurableConsumer(topicConsumer, subscriptionName); + MessageConsumer consumer3 = session.createSharedDurableConsumer(topicConsumer, subscriptionName); + + connection.start(); + + for (int j = 0; j < 100; j++) { + TextMessage message = session.createTextMessage(); + + message.setText("Message" + j); + + producer.send(message); + } + + + //All msgs should go to the first consumer + for (int j = 0; j < 50; j++) { + TextMessage tm = (TextMessage) consumer1.receive(10000); + + assertNotNull(tm); + + assertEquals("Message" + j, tm.getText()); + + tm = (TextMessage) consumer2.receiveNoWait(); + assertNull(tm); + tm = (TextMessage) consumer3.receiveNoWait(); + assertNull(tm); + } + consumer1.close(); + + //All msgs should now go to the next consumer only, without any errors or exceptions + for (int j = 50; j < 100; j++) { + TextMessage tm = (TextMessage) consumer2.receive(10000); + + assertNotNull(tm); + + assertEquals("Message" + j, tm.getText()); + + tm = (TextMessage) consumer3.receiveNoWait(); + assertNull(tm); + } + + + } finally { + connection.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java new file mode 100644 index 0000000..af4a20a --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Before; +import org.junit.Test; + +/** + * LVQ Test + */ +public class LVQTest extends JMSTestBase { + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + } + + + protected ConnectionFactory getCF() throws Exception { + return cf; + } + + @Test + public void testLastValueQueueUsingAddressQueueParameters() throws Exception { + ActiveMQConnectionFactory fact = (ActiveMQConnectionFactory) getCF(); + + //Set the consumer window size to 0 to not buffer any messages client side. + fact.setConsumerWindowSize(0); + Connection connection = fact.createConnection(); + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Queue queue = session.createQueue("random?last-value=true"); + assertEquals("random", queue.getQueueName()); + + ActiveMQDestination a = (ActiveMQDestination) queue; + assertTrue(a.getQueueAttributes().getLastValue()); + + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer1 = session.createConsumer(queue); + + connection.start(); + for (int j = 0; j < 100; j++) { + TextMessage message = session.createTextMessage(); + + message.setText("Message" + j); + message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "key"); + producer.send(message); + } + + //Last message only should go to the consumer + TextMessage tm = (TextMessage) consumer1.receive(10000); + + assertNotNull(tm); + + assertEquals("Message99", tm.getText()); + + } finally { + connection.close(); + } + } + + @Test + public void testLastValueQueueTopicConsumerUsingAddressQueueParameters() throws Exception { + ActiveMQConnectionFactory fact = (ActiveMQConnectionFactory) getCF(); + + //Set the consumer window size to 0 to not buffer any messages client side. + fact.setConsumerWindowSize(0); + Connection connection = fact.createConnection(); + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Topic topic = session.createTopic("topic?last-value=true"); + assertEquals("topic", topic.getTopicName()); + + ActiveMQDestination a = (ActiveMQDestination) topic; + assertTrue(a.getQueueAttributes().getLastValue()); + + MessageProducer producer = session.createProducer(topic); + MessageConsumer consumer1 = session.createConsumer(topic); + MessageConsumer consumer2 = session.createConsumer(topic); + + connection.start(); + for (int j = 0; j < 100; j++) { + TextMessage message = session.createTextMessage(); + + message.setText("Message" + j); + message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "key"); + producer.send(message); + } + + + + //Last message only should go to the consumer. + TextMessage tm = (TextMessage) consumer1.receive(10000); + + assertNotNull(tm); + + assertEquals("Message99", tm.getText()); + + //Last message only should go to the other consumer as well. + TextMessage tm2 = (TextMessage) consumer2.receive(10000); + + assertNotNull(tm2); + + assertEquals("Message99", tm2.getText()); + + } finally { + connection.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 9351fee..16d64fd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -136,6 +136,16 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes } @Override + public String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType, + @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers, + @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers, + @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive) + throws Exception { + return (String) proxy.invokeOperation("updateQueue", name, routingType, maxConsumers, purgeOnNoConsumers, exclusive); + } + + @Override public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception { proxy.invokeOperation("deleteAddress", name); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index 82c7a47..09621af 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -102,6 +102,16 @@ public class QueueControlUsingCoreTest extends QueueControlTest { } @Override + public boolean isExclusive() { + return (Boolean) proxy.retrieveAttributeValue("exclusive"); + } + + @Override + public boolean isLastValue() { + return (Boolean) proxy.retrieveAttributeValue("lastValue"); + } + + @Override public int getDeliveringCount() { return (Integer) proxy.retrieveAttributeValue("deliveringCount", Integer.class); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java index f121cbe..96c1fd2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java @@ -170,7 +170,7 @@ public class LVQRecoveryTest extends ActiveMQTestBase { server = createServer(true, configuration); server.start(); - qs = new AddressSettings().setLastValueQueue(true); + qs = new AddressSettings().setDefaultLastValueQueue(true); server.getAddressSettingsRepository().addMatch(address.toString(), qs); // then we create a client as normal locator = createInVMNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0); @@ -188,7 +188,7 @@ public class LVQRecoveryTest extends ActiveMQTestBase { // start the server server.start(); - server.getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings().setLastValueQueue(true)); + server.getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings().setDefaultLastValueQueue(true)); // then we create a client as normal locator.close(); locator = createInVMNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java index 0706bc0..f262aed 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java @@ -648,7 +648,7 @@ public class LVQTest extends ActiveMQTestBase { // start the server server.start(); - server.getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings().setLastValueQueue(true)); + server.getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings().setDefaultLastValueQueue(true)); // then we create a client as normalServer ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java index 6e1fff2..f249383 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java @@ -917,6 +917,12 @@ public class MessageHeaderTest extends MessageHeaderTestBase { } + @Override + public void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, + boolean durable, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) throws ActiveMQException { + + } + /** * Creates a non-temporary queue. * @@ -1018,6 +1024,13 @@ public class MessageHeaderTest extends MessageHeaderTestBase { } + @Override + public void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, + boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) + throws ActiveMQException { + + } + /** * Creates a non-temporaryqueue. * @@ -1048,6 +1061,13 @@ public class MessageHeaderTest extends MessageHeaderTestBase { } + @Override + public void createQueue(String address, RoutingType routingType, String queueName, String filter, boolean durable, + boolean autoCreated, + int maxConsumers, boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) throws ActiveMQException { + + } + /** * Creates a temporary queue. * @@ -1074,6 +1094,13 @@ public class MessageHeaderTest extends MessageHeaderTestBase { } + @Override + public void createTemporaryQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, + int maxConsumers, boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) + throws ActiveMQException { + + } + /** * Creates a temporary queue with a filter. * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index f0afd9e..6e66057 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -48,6 +48,23 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override + public boolean isExclusive() { + // no-op + return false; + } + + @Override + public void setExclusive(boolean value) { + // no-op + } + + @Override + public boolean isLastValue() { + // no-op + return false; + } + + @Override public void setMaxConsumer(int maxConsumers) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 98a7359..96c7451 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -46,7 +46,8 @@ public class FakePostOffice implements PostOffice { public QueueBinding updateQueue(SimpleString name, RoutingType routingType, Integer maxConsumers, - Boolean purgeOnNoConsumers) throws Exception { + Boolean purgeOnNoConsumers, + Boolean exclusive) throws Exception { return null; }