Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 79004200CAF for ; Thu, 22 Jun 2017 18:13:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 77E37160BE7; Thu, 22 Jun 2017 16:13:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B0D6B160BE5 for ; Thu, 22 Jun 2017 18:13:22 +0200 (CEST) Received: (qmail 5204 invoked by uid 500); 22 Jun 2017 16:13:21 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 5195 invoked by uid 99); 22 Jun 2017 16:13:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jun 2017 16:13:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4B57FDFE22; Thu, 22 Jun 2017 16:13:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Thu, 22 Jun 2017 16:13:19 -0000 Message-Id: <0a11201116a946a9a1e00f9e9e95e6dc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] activemq-artemis git commit: ARTEMIS-1235 delete queues when broker.xml changes archived-at: Thu, 22 Jun 2017 16:13:25 -0000 Repository: activemq-artemis Updated Branches: refs/heads/master be8eb3ec9 -> a970b41eb ARTEMIS-1235 delete queues when broker.xml changes Add extra configuration to address-settings to be able to control / enable address/queue deletion by pattern, rather than a global toggle. Add support in the reload logic to remove address and/or queues if the address matches an address setting, where it is enabled. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f63f1304 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f63f1304 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f63f1304 Branch: refs/heads/master Commit: f63f130407c6ce967b6f05962fcd42cd11049fea Parents: be8eb3e Author: Michael Andre Pearce Authored: Sat Jun 17 06:43:02 2017 +0100 Committer: Clebert Suconic Committed: Thu Jun 22 12:12:12 2017 -0400 ---------------------------------------------------------------------- .../artemis/core/config/impl/Validators.java | 11 ++ .../deployers/impl/FileConfigurationParser.java | 15 ++ .../core/server/ActiveMQMessageBundle.java | 2 + .../core/server/ActiveMQServerLogger.java | 8 ++ .../core/server/impl/ActiveMQServerImpl.java | 53 +++++++ .../core/settings/impl/AddressSettings.java | 73 +++++++++- .../core/settings/impl/DeletionPolicy.java | 21 +++ .../resources/schema/artemis-configuration.xsd | 33 ++++- .../test/resources/artemis-configuration.xsd | 34 ++++- docs/user-manual/en/address-model.md | 11 ++ docs/user-manual/en/config-reload.md | 17 ++- .../tests/integration/jms/RedeployTest.java | 72 ++++++++++ .../resources/reload-address-queues-updated.xml | 128 +++++++++++++++++ .../test/resources/reload-address-queues.xml | 143 +++++++++++++++++++ 14 files changed, 616 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f63f1304/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java index 56688dc..bef4d1b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; /** @@ -156,6 +157,16 @@ public final class Validators { } }; + public static final Validator DELETION_POLICY_TYPE = new Validator() { + @Override + public void validate(final String name, final Object value) { + String val = (String) value; + if (val == null || !val.equals(DeletionPolicy.OFF.toString()) && !val.equals(DeletionPolicy.FORCE.toString())) { + throw ActiveMQMessageBundle.BUNDLE.invalidDeletionPolicyType(val); + } + } + }; + public static final Validator MESSAGE_LOAD_BALANCING_TYPE = new Validator() { @Override public void validate(final String name, final Object value) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f63f1304/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index e06397d..a4a3487 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfigu import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.utils.ByteUtil; @@ -193,10 +194,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String AUTO_DELETE_QUEUES = "auto-delete-queues"; + private static final String CONFIG_DELETE_QUEUES = "config-delete-queues"; + private static final String AUTO_CREATE_ADDRESSES = "auto-create-addresses"; private static final String AUTO_DELETE_ADDRESSES = "auto-delete-addresses"; + private static final String CONFIG_DELETE_ADDRESSES = "config-delete-addresses"; + private static final String DEFAULT_PURGE_ON_NO_CONSUMERS = "default-purge-on-no-consumers"; private static final String DEFAULT_MAX_CONSUMERS = "default-max-consumers"; @@ -985,10 +990,20 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { addressSettings.setAutoCreateQueues(XMLUtil.parseBoolean(child)); } else if (AUTO_DELETE_QUEUES.equalsIgnoreCase(name)) { addressSettings.setAutoDeleteQueues(XMLUtil.parseBoolean(child)); + } else if (CONFIG_DELETE_QUEUES.equalsIgnoreCase(name)) { + String value = getTrimmedTextContent(child); + Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_QUEUES, value); + DeletionPolicy policy = Enum.valueOf(DeletionPolicy.class, value); + addressSettings.setConfigDeleteQueues(policy); } else if (AUTO_CREATE_ADDRESSES.equalsIgnoreCase(name)) { addressSettings.setAutoCreateAddresses(XMLUtil.parseBoolean(child)); } else if (AUTO_DELETE_ADDRESSES.equalsIgnoreCase(name)) { addressSettings.setAutoDeleteAddresses(XMLUtil.parseBoolean(child)); + } else if (CONFIG_DELETE_ADDRESSES.equalsIgnoreCase(name)) { + String value = getTrimmedTextContent(child); + Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_ADDRESSES, value); + DeletionPolicy policy = Enum.valueOf(DeletionPolicy.class, value); + addressSettings.setConfigDeleteAddresses(policy); } else if (MANAGEMENT_BROWSE_PAGE_SIZE.equalsIgnoreCase(name)) { addressSettings.setManagementBrowsePageSize(XMLUtil.parseInt(child)); } else if (DEFAULT_PURGE_ON_NO_CONSUMERS.equalsIgnoreCase(name)) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f63f1304/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index b0aa021..8c9eb66 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -427,5 +427,7 @@ public interface ActiveMQMessageBundle { String address, Set supportedRoutingTypes); + @Message(id = 119212, value = "Invalid deletion policy type {0}", format = Message.Format.MESSAGE_FORMAT) + IllegalArgumentException invalidDeletionPolicyType(String val); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f63f1304/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index ed2c862..461c662 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1582,4 +1582,12 @@ public interface ActiveMQServerLogger extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) @Message(id = 224075, value = "Cannot find pageTX id = {0}", format = Message.Format.MESSAGE_FORMAT) void journalCannotFindPageTX(Long id); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 224076, value = "UnDeploying address {0}", format = Message.Format.MESSAGE_FORMAT) + void undeployAddress(SimpleString addressName); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 224077, value = "UnDeploying queue {0}", format = Message.Format.MESSAGE_FORMAT) + void undeployQueue(SimpleString queueName); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f63f1304/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 71c7c71..c0e0c9e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -47,6 +47,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException; @@ -151,6 +152,7 @@ import org.apache.activemq.artemis.core.server.reload.ReloadManager; import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; import org.apache.activemq.artemis.core.transaction.ResourceManager; @@ -2238,6 +2240,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { // Deploy any predefined queues deployQueuesFromConfiguration(); + // Undeploy any addresses and queues not in config + undeployAddressesAndQueueNotInConfiguration(); + // We need to call this here, this gives any dependent server a chance to deploy its own addresses // this needs to be done before clustering is fully activated callActivateCallbacks(); @@ -2313,6 +2318,53 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } + private void undeployAddressesAndQueueNotInConfiguration() throws Exception { + undeployAddressesAndQueueNotInConfiguration(configuration); + } + + private void undeployAddressesAndQueueNotInConfiguration(Configuration configuration) throws Exception { + Set addressesInConfig = configuration.getAddressConfigurations().stream() + .map(CoreAddressConfiguration::getName) + .collect(Collectors.toSet()); + + Set queuesInConfig = configuration.getAddressConfigurations().stream() + .map(CoreAddressConfiguration::getQueueConfigurations) + .flatMap(List::stream).map(CoreQueueConfiguration::getName) + .collect(Collectors.toSet()); + + for (SimpleString addressName : listAddressNames()) { + AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString()); + + if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) { + for (Queue queue : listQueues(addressName)) { + ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName()); + queue.deleteQueue(true); + } + ActiveMQServerLogger.LOGGER.undeployAddress(addressName); + removeAddressInfo(addressName, null); + } else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) { + for (Queue queue : listConfiguredQueues(addressName)) { + if (!queuesInConfig.contains(queue.getName().toString())) { + ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName()); + queue.deleteQueue(true); + } + } + } + } + } + + private Set listAddressNames() { + return postOffice.getAddresses(); + } + + private List listConfiguredQueues(SimpleString address) throws Exception { + return listQueues(address).stream().filter(queue -> !queue.isAutoCreated() && !queue.isInternalQueue()).collect(Collectors.toList()); + } + + private List listQueues(SimpleString address) throws Exception { + return postOffice.listQueuesForAddress(address); + } + private void deployAddressesFromConfiguration() throws Exception { deployAddressesFromConfiguration(configuration); } @@ -2818,6 +2870,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses"); deployAddressesFromConfiguration(config); + undeployAddressesAndQueueNotInConfiguration(config); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f63f1304/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 6e7c28c..ca9c550 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -71,10 +71,14 @@ public class AddressSettings implements Mergeable, Serializable public static final boolean DEFAULT_AUTO_DELETE_QUEUES = true; + public static final DeletionPolicy DEFAULT_CONFIG_DELETE_QUEUES = DeletionPolicy.OFF; + public static final boolean DEFAULT_AUTO_CREATE_ADDRESSES = true; public static final boolean DEFAULT_AUTO_DELETE_ADDRESSES = true; + public static final DeletionPolicy DEFAULT_CONFIG_DELETE_ADDRESSES = DeletionPolicy.OFF; + public static final long DEFAULT_REDISTRIBUTION_DELAY = -1; public static final long DEFAULT_EXPIRY_DELAY = -1; @@ -148,10 +152,14 @@ public class AddressSettings implements Mergeable, Serializable private Boolean autoDeleteQueues = null; + private DeletionPolicy configDeleteQueues = null; + private Boolean autoCreateAddresses = null; private Boolean autoDeleteAddresses = null; + private DeletionPolicy configDeleteAddresses = null; + private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE; private Long maxSizeBytesRejectThreshold = null; @@ -194,8 +202,10 @@ public class AddressSettings implements Mergeable, Serializable this.autoDeleteJmsTopics = other.autoDeleteJmsTopics; this.autoCreateQueues = other.autoCreateQueues; this.autoDeleteQueues = other.autoDeleteQueues; + this.configDeleteQueues = other.configDeleteQueues; this.autoCreateAddresses = other.autoCreateAddresses; this.autoDeleteAddresses = other.autoDeleteAddresses; + this.configDeleteAddresses = other.configDeleteAddresses; this.managementBrowsePageSize = other.managementBrowsePageSize; this.queuePrefetch = other.queuePrefetch; this.maxSizeBytesRejectThreshold = other.maxSizeBytesRejectThreshold; @@ -270,6 +280,15 @@ public class AddressSettings implements Mergeable, Serializable return this; } + public DeletionPolicy getConfigDeleteQueues() { + return configDeleteQueues != null ? configDeleteQueues : AddressSettings.DEFAULT_CONFIG_DELETE_QUEUES; + } + + public AddressSettings setConfigDeleteQueues(DeletionPolicy configDeleteQueues) { + this.configDeleteQueues = configDeleteQueues; + return this; + } + public boolean isAutoCreateAddresses() { return autoCreateAddresses != null ? autoCreateAddresses : AddressSettings.DEFAULT_AUTO_CREATE_ADDRESSES; } @@ -288,6 +307,15 @@ public class AddressSettings implements Mergeable, Serializable return this; } + public DeletionPolicy getConfigDeleteAddresses() { + return configDeleteAddresses != null ? configDeleteAddresses : AddressSettings.DEFAULT_CONFIG_DELETE_ADDRESSES; + } + + public AddressSettings setConfigDeleteAddresses(DeletionPolicy configDeleteAddresses) { + this.configDeleteAddresses = configDeleteAddresses; + return this; + } + public int getDefaultMaxConsumers() { return defaultMaxConsumers != null ? defaultMaxConsumers : ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(); } @@ -594,12 +622,18 @@ public class AddressSettings implements Mergeable, Serializable if (autoDeleteQueues == null) { autoDeleteQueues = merged.autoDeleteQueues; } + if (configDeleteQueues == null) { + configDeleteQueues = merged.configDeleteQueues; + } if (autoCreateAddresses == null) { autoCreateAddresses = merged.autoCreateAddresses; } if (autoDeleteAddresses == null) { autoDeleteAddresses = merged.autoDeleteAddresses; } + if (configDeleteAddresses == null) { + configDeleteAddresses = merged.configDeleteAddresses; + } if (managementBrowsePageSize == null) { managementBrowsePageSize = merged.managementBrowsePageSize; } @@ -687,10 +721,25 @@ public class AddressSettings implements Mergeable, Serializable autoDeleteQueues = BufferHelper.readNullableBoolean(buffer); + policyStr = buffer.readNullableSimpleString(); + + if (policyStr != null) { + configDeleteQueues = DeletionPolicy.valueOf(policyStr.toString()); + } else { + configDeleteQueues = null; + } + autoCreateAddresses = BufferHelper.readNullableBoolean(buffer); autoDeleteAddresses = BufferHelper.readNullableBoolean(buffer); + policyStr = buffer.readNullableSimpleString(); + + if (policyStr != null) { + configDeleteAddresses = DeletionPolicy.valueOf(policyStr.toString()); + } else { + configDeleteAddresses = null; + } managementBrowsePageSize = BufferHelper.readNullableInteger(buffer); maxSizeBytesRejectThreshold = BufferHelper.readNullableLong(buffer); @@ -731,9 +780,9 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.sizeOfNullableBoolean(autoCreateJmsTopics) + BufferHelper.sizeOfNullableBoolean(autoDeleteJmsTopics) + BufferHelper.sizeOfNullableBoolean(autoCreateQueues) + - BufferHelper.sizeOfNullableBoolean(autoDeleteQueues) + + BufferHelper.sizeOfNullableBoolean(autoDeleteQueues) + BufferHelper.sizeOfNullableSimpleString(configDeleteQueues != null ? configDeleteQueues.toString() : null) + BufferHelper.sizeOfNullableBoolean(autoCreateAddresses) + - BufferHelper.sizeOfNullableBoolean(autoDeleteAddresses) + + BufferHelper.sizeOfNullableBoolean(autoDeleteAddresses) + BufferHelper.sizeOfNullableSimpleString(configDeleteAddresses != null ? configDeleteAddresses.toString() : null) + BufferHelper.sizeOfNullableInteger(managementBrowsePageSize) + BufferHelper.sizeOfNullableLong(maxSizeBytesRejectThreshold) + BufferHelper.sizeOfNullableInteger(defaultMaxConsumers) + @@ -794,10 +843,14 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableBoolean(buffer, autoDeleteQueues); + buffer.writeNullableSimpleString(configDeleteQueues != null ? new SimpleString(configDeleteQueues.toString()) : null); + BufferHelper.writeNullableBoolean(buffer, autoCreateAddresses); BufferHelper.writeNullableBoolean(buffer, autoDeleteAddresses); + buffer.writeNullableSimpleString(configDeleteAddresses != null ? new SimpleString(configDeleteAddresses.toString()) : null); + BufferHelper.writeNullableInteger(buffer, managementBrowsePageSize); BufferHelper.writeNullableLong(buffer, maxSizeBytesRejectThreshold); @@ -843,8 +896,10 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((autoDeleteJmsTopics == null) ? 0 : autoDeleteJmsTopics.hashCode()); result = prime * result + ((autoCreateQueues == null) ? 0 : autoCreateQueues.hashCode()); result = prime * result + ((autoDeleteQueues == null) ? 0 : autoDeleteQueues.hashCode()); + result = prime * result + ((configDeleteQueues == null) ? 0 : configDeleteQueues.hashCode()); result = prime * result + ((autoCreateAddresses == null) ? 0 : autoCreateAddresses.hashCode()); result = prime * result + ((autoDeleteAddresses == null) ? 0 : autoDeleteAddresses.hashCode()); + result = prime * result + ((configDeleteAddresses == null) ? 0 : configDeleteAddresses.hashCode()); result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode()); result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode()); result = prime * result + ((maxSizeBytesRejectThreshold == null) ? 0 : maxSizeBytesRejectThreshold.hashCode()); @@ -992,6 +1047,11 @@ public class AddressSettings implements Mergeable, Serializable return false; } else if (!autoDeleteQueues.equals(other.autoDeleteQueues)) return false; + if (configDeleteQueues == null) { + if (other.configDeleteQueues != null) + return false; + } else if (!configDeleteQueues.equals(other.configDeleteQueues)) + return false; if (autoCreateAddresses == null) { if (other.autoCreateAddresses != null) return false; @@ -1002,6 +1062,11 @@ public class AddressSettings implements Mergeable, Serializable return false; } else if (!autoDeleteAddresses.equals(other.autoDeleteAddresses)) return false; + if (configDeleteAddresses == null) { + if (other.configDeleteAddresses != null) + return false; + } else if (!configDeleteAddresses.equals(other.configDeleteAddresses)) + return false; if (managementBrowsePageSize == null) { if (other.managementBrowsePageSize != null) return false; @@ -1101,10 +1166,14 @@ public class AddressSettings implements Mergeable, Serializable autoCreateQueues + ", autoDeleteQueues=" + autoDeleteQueues + + ", configDeleteQueues=" + + configDeleteQueues + ", autoCreateAddresses=" + autoCreateAddresses + ", autoDeleteAddresses=" + autoDeleteAddresses + + ", configDeleteAddresses=" + + configDeleteAddresses + ", managementBrowsePageSize=" + managementBrowsePageSize + ", defaultMaxConsumers=" + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f63f1304/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/DeletionPolicy.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/DeletionPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/DeletionPolicy.java new file mode 100644 index 0000000..3ff1041 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/DeletionPolicy.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.settings.impl; + +public enum DeletionPolicy { + OFF, FORCE; +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f63f1304/artemis-server/src/main/resources/schema/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 9021b70..255fe17 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2635,6 +2635,22 @@ + + + + What to do when a queue is no longer in broker.xml. + OFF = will do nothing queues will remain, + FORCE = delete queues even if messages remaining. + + + + + + + + + + @@ -2653,6 +2669,22 @@ + + + + What to do when an address is no longer in broker.xml. + OFF = will do nothing addresses will remain, + FORCE = delete address and its queues even if messages remaining. + + + + + + + + + + @@ -2787,7 +2819,6 @@ - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f63f1304/artemis-tools/src/test/resources/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd index eda0932..3fb01e3 100644 --- a/artemis-tools/src/test/resources/artemis-configuration.xsd +++ b/artemis-tools/src/test/resources/artemis-configuration.xsd @@ -2455,6 +2455,22 @@ + + + + What to do when a queue is no longer in broker.xml. + OFF = will do nothing queues will remain, + FORCE = delete queues even if messages remaining. + + + + + + + + + + @@ -2468,11 +2484,27 @@ - whether or not to delete auto-created addresses when it no longer has any queues + What to do when an address is no longer in broker.xml. + OFF = will do nothing addresses will remain, + FORCE = delete address and its queues even if messages remaining. + + + + What to do when an address is no longer in broker.xml. OFF = will do nothing addresses will remain, FORCE = delete queues, and address even if messages remaining. + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f63f1304/docs/user-manual/en/address-model.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md index 0dbf320..0a6c9f0 100644 --- a/docs/user-manual/en/address-model.md +++ b/docs/user-manual/en/address-model.md @@ -577,6 +577,11 @@ are durable, non-temporary, and non-transient. Default is `true`. delete auto-created queues when they have both 0 consumers and 0 messages. Default is `true`. +`config-delete-queues`. How the broker should handle queues deleted +on config reload, by delete policy: `OFF` or `FORCE`. +See [config-reload](config-reload.md) for more details. +Default is `OFF`. + `auto-create-addresses`. Whether or not the broker should automatically create an address when a message is sent to or a consumer tries to consume from a queue which is mapped to an address whose name fits the address `match`. @@ -585,3 +590,9 @@ Default is `true`. `auto-delete-addresses`. Whether or not the broker should automatically delete auto-created addresses once the address no longer has any queues. Default is `true`. + +`config-delete-addresses`. How the broker should handle addresses deleted +on config reload, by delete policy: `OFF` or `FORCE`. +See [config-reload](config-reload.md) for more details. +Default is `OFF`. + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f63f1304/docs/user-manual/en/config-reload.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/config-reload.md b/docs/user-manual/en/config-reload.md index 1fba9d6..5c414cc 100644 --- a/docs/user-manual/en/config-reload.md +++ b/docs/user-manual/en/config-reload.md @@ -10,4 +10,19 @@ Once the configuration file is changed (broker.xml) the following modules will b - Addresses & queues -Notice: Address & queues won't be removed upon reload, given the risk of losing messages. You may execute explicit CLI or Management operations to remove destinations. \ No newline at end of file +Notice: + +Deletion of Address's and Queue's, not auto created is controlled by Address Settings + +* config-delete-addresses + * OFF (DEFAULT) - will not remove upon config reload. + * FORCE - will remove the address and its queues upon config reload, even if messages remains, losing the messages in the address & queues. + +* config-delete-queues + * OFF (DEFAULT) - will not remove upon config reload. + * FORCE - will remove the queue upon config reload, even if messages remains, losing the messages in the queue. + + +By default both settings are OFF as such address & queues won't be removed upon reload, given the risk of losing messages. + +When OFF You may execute explicit CLI or Management operations to remove address & queues. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f63f1304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java index 590a997..4260287 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java @@ -27,8 +27,12 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -104,4 +108,72 @@ public class RedeployTest extends ActiveMQTestBase { } } + + + + + @Test + public void testRedeployAddressQueue() throws Exception { + Path brokerXML = getTestDirfile().toPath().resolve("broker.xml"); + URL url1 = RedeployTest.class.getClassLoader().getResource("reload-address-queues.xml"); + URL url2 = RedeployTest.class.getClassLoader().getResource("reload-address-queues-updated.xml"); + Files.copy(url1.openStream(), brokerXML); + + EmbeddedJMS embeddedJMS = new EmbeddedJMS(); + embeddedJMS.setConfigResourcePath(brokerXML.toUri().toString()); + embeddedJMS.start(); + + final ReusableLatch latch = new ReusableLatch(1); + + Runnable tick = new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }; + + embeddedJMS.getActiveMQServer().getReloadManager().setTick(tick); + + try { + latch.await(10, TimeUnit.SECONDS); + Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_address_removal")); + Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_queue_removal")); + Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_removal").contains("config_test_queue_removal_queue_1")); + Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_removal").contains("config_test_queue_removal_queue_2")); + + Assert.assertNotNull(getAddressInfo(embeddedJMS, "permanent_test_address_removal")); + Assert.assertNotNull(getAddressInfo(embeddedJMS, "permanent_test_queue_removal")); + Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_1")); + Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_2")); + + + Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING); + brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000); + latch.setCount(1); + embeddedJMS.getActiveMQServer().getReloadManager().setTick(tick); + latch.await(10, TimeUnit.SECONDS); + + Assert.assertNull(getAddressInfo(embeddedJMS, "config_test_address_removal")); + Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_queue_removal")); + Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_removal").contains("config_test_queue_removal_queue_1")); + Assert.assertFalse(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_removal").contains("config_test_queue_removal_queue_2")); + + Assert.assertNotNull(getAddressInfo(embeddedJMS, "permanent_test_address_removal")); + Assert.assertNotNull(getAddressInfo(embeddedJMS, "permanent_test_queue_removal")); + Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_1")); + Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_2")); + } finally { + embeddedJMS.stop(); + } + } + + private AddressInfo getAddressInfo(EmbeddedJMS embeddedJMS, String address) { + return embeddedJMS.getActiveMQServer().getPostOffice().getAddressInfo(SimpleString.toSimpleString(address)); + } + + private List listQueuesNamesForAddress(EmbeddedJMS embeddedJMS, String address) throws Exception { + return embeddedJMS.getActiveMQServer().getPostOffice().listQueuesForAddress(SimpleString.toSimpleString(address)).stream().map( + org.apache.activemq.artemis.core.server.Queue::getName).map(SimpleString::toString).collect(Collectors.toList()); + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f63f1304/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml b/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml new file mode 100644 index 0000000..17a035e --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml @@ -0,0 +1,128 @@ + + + + + + + + 0.0.0.0 + + 100 + + false + + false + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + 2 + + -1 + + + 40000 + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576 + + + tcp://0.0.0.0:5672?protocols=AMQP + + + tcp://0.0.0.0:61613?protocols=STOMP + + + tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP + + + tcp://0.0.0.0:1883?protocols=MQTT + + + + + + + + + + + + + + + + + + + + + 0 + 10Mb + 10 + BLOCK + + + + false + DLQ + ExpiryQueue + 0 + 10Mb + 10 + BLOCK + FORCE + FORCE + + + + +
+ + + +
+
+ + + +
+
+
+
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f63f1304/tests/integration-tests/src/test/resources/reload-address-queues.xml ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/resources/reload-address-queues.xml b/tests/integration-tests/src/test/resources/reload-address-queues.xml new file mode 100644 index 0000000..e9b42be --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-address-queues.xml @@ -0,0 +1,143 @@ + + + + + + + + 0.0.0.0 + + 100 + + false + + false + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + 2 + + -1 + + + 40000 + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576 + + + tcp://0.0.0.0:5672?protocols=AMQP + + + tcp://0.0.0.0:61613?protocols=STOMP + + + tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP + + + tcp://0.0.0.0:1883?protocols=MQTT + + + + + + + + + + + + + + + + + + + + + false + DLQ + ExpiryQueue + 0 + 10Mb + 10 + BLOCK + + + + false + DLQ + ExpiryQueue + 0 + 10Mb + 10 + BLOCK + FORCE + FORCE + + + + +
+ + + + +
+
+ + + +
+
+ + + + +
+
+ + + +
+
+
+