Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 08B81200BB9 for ; Mon, 7 Nov 2016 17:36:29 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 07509160AEC; Mon, 7 Nov 2016 16:36:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9E53F160B15 for ; Mon, 7 Nov 2016 17:36:26 +0100 (CET) Received: (qmail 27369 invoked by uid 500); 7 Nov 2016 16:36:25 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 24894 invoked by uid 99); 7 Nov 2016 16:36:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Nov 2016 16:36:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 615CFEEE7B; Mon, 7 Nov 2016 16:36:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Mon, 07 Nov 2016 16:37:06 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [46/50] [abbrv] activemq-artemis git commit: Remove JMS prefixes archived-at: Mon, 07 Nov 2016 16:36:29 -0000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java index 456bb58..f10962e 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java @@ -28,7 +28,6 @@ import java.net.InetAddress; import java.net.URL; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -57,12 +56,9 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.PostQueueCreationCallback; -import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.QueueCreator; -import org.apache.activemq.artemis.core.server.QueueDeleter; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.reload.ReloadCallback; import org.apache.activemq.artemis.core.server.reload.ReloadManager; @@ -389,15 +385,15 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback return; } - server.setJMSQueueCreator(new JMSDestinationCreator()); - - server.setJMSQueueDeleter(new JMSQueueDeleter()); +// server.setJMSQueueCreator(new JMSDestinationCreator()); +// +// server.setJMSQueueDeleter(new JMSQueueDeleter()); server.registerActivateCallback(this); - server.registerPostQueueCreationCallback(new JMSPostQueueCreationCallback()); - - server.registerPostQueueDeletionCallback(new JMSPostQueueDeletionCallback()); +// server.registerPostQueueCreationCallback(new JMSPostQueueCreationCallback()); +// +// server.registerPostQueueDeletionCallback(new JMSPostQueueDeletionCallback()); /** * See this method's javadoc. *

@@ -794,11 +790,11 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback public synchronized boolean destroyQueue(final String name, final boolean removeConsumers) throws Exception { checkInitialised(); - server.destroyQueue(ActiveMQDestination.createQueueAddressFromName(name), null, !removeConsumers, removeConsumers); + server.destroyQueue(SimpleString.toSimpleString(name), null, !removeConsumers, removeConsumers); // if the queue has consumers and 'removeConsumers' is false then the queue won't actually be removed // therefore only remove the queue from Bindings, etc. if the queue is actually removed - if (this.server.getPostOffice().getBinding(ActiveMQDestination.createQueueAddressFromName(name)) == null) { + if (this.server.getPostOffice().getBinding(SimpleString.toSimpleString(name)) == null) { removeFromBindings(queues, queueBindings, name); queues.remove(name); @@ -823,7 +819,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback @Override public synchronized boolean destroyTopic(final String name, final boolean removeConsumers) throws Exception { checkInitialised(); - AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.CORE_ADDRESS + ActiveMQDestination.createTopicAddressFromName(name)); + AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.CORE_ADDRESS + name); if (addressControl != null) { for (String queueName : addressControl.getQueueNames()) { Binding binding = server.getPostOffice().getBinding(new SimpleString(queueName)); @@ -1093,6 +1089,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString); } + server.createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQQueue.getName())).setRoutingType(AddressInfo.RoutingType.ANYCAST).setDefaultMaxQueueConsumers(-1)); + Queue queue = server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false, autoCreated); queues.put(queueName, activeMQQueue); @@ -1128,7 +1126,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback // checks when routing messages to a topic that // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no // subscriptions - core has no notion of a topic - server.deployQueue(SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(JMSServerManagerImpl.REJECT_FILTER), true, false, autoCreated); +// server.deployQueue(SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(JMSServerManagerImpl.REJECT_FILTER), true, false, autoCreated); + server.createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQTopic.getAddress()))); topics.put(topicName, activeMQTopic); @@ -1640,95 +1639,95 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback * This class is responsible for auto-creating the JMS (and underlying core) resources when a client sends a message * to a non-existent JMS queue or topic */ - class JMSDestinationCreator implements QueueCreator { - - @Override - public boolean create(SimpleString address) throws Exception { - AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); - if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoCreateJmsQueues()) { - return internalCreateJMSQueue(false, address.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), null, true, true); - } else if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics()) { - return createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true); - } else { - return false; - } - } - } - - class JMSQueueDeleter implements QueueDeleter { - - @Override - public boolean delete(SimpleString queueName) throws Exception { - Queue queue = server.locateQueue(queueName); - SimpleString address = queue.getAddress(); - AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); - long consumerCount = queue.getConsumerCount(); - long messageCount = queue.getMessageCount(); - - if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoDeleteJmsQueues() && queue.getMessageCount() == 0) { - if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQJMSServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteJmsQueues = " + settings.isAutoDeleteJmsQueues()); - } - - return destroyQueue(queueName.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), false); - } else { - return false; - } - } - } +// class JMSDestinationCreator implements QueueCreator { +// +// @Override +// public boolean create(SimpleString address) throws Exception { +// AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); +// if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoCreateJmsQueues()) { +// return internalCreateJMSQueue(false, address.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), null, true, true); +// } else if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics()) { +// return createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true); +// } else { +// return false; +// } +// } +// } + +// class JMSQueueDeleter implements QueueDeleter { +// +// @Override +// public boolean delete(SimpleString queueName) throws Exception { +// Queue queue = server.locateQueue(queueName); +// SimpleString address = queue.getAddress(); +// AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); +// long consumerCount = queue.getConsumerCount(); +// long messageCount = queue.getMessageCount(); +// +// if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.getAutoDeleteJmsQueues() && queue.getMessageCount() == 0) { +// if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) { +// ActiveMQJMSServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; getAutoDeleteJmsQueues = " + settings.getAutoDeleteJmsQueues()); +// } +// +// return destroyQueue(queueName.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), false); +// } else { +// return false; +// } +// } +// } /** * When a core queue is created with a jms.topic prefix this class will create the associated JMS resources * retroactively. This would happen if, for example, a client created a subscription a non-existent JMS topic and * autoCreateJmsTopics = true. */ - class JMSPostQueueCreationCallback implements PostQueueCreationCallback { - - @Override - public void callback(SimpleString queueName) throws Exception { - Queue queue = server.locateQueue(queueName); - String address = queue.getAddress().toString(); - - AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); - /* When a topic is created a dummy subscription is created which never receives any messages; when the queue - * for that dummy subscription is created we don't want to call createTopic again. Therefore we make sure the - * queue name doesn't start with the topic prefix. - */ - if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics() && !queueName.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX)) { - createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true); - } - } - } +// class JMSPostQueueCreationCallback implements PostQueueCreationCallback { +// +// @Override +// public void callback(SimpleString queueName) throws Exception { +// Queue queue = server.locateQueue(queueName); +// String address = queue.getAddress().toString(); +// +// AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); +// /* When a topic is created a dummy subscription is created which never receives any messages; when the queue +// * for that dummy subscription is created we don't want to call createTopic again. Therefore we make sure the +// * queue name doesn't start with the topic prefix. +// */ +// if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics() && !queueName.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX)) { +// createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true); +// } +// } +// } /** * When a core queue representing a JMS topic subscription is deleted this class will check to see if that was the * last subscription on the topic and if so and autoDeleteJmsTopics = true then it will delete the JMS resources * for that topic. */ - class JMSPostQueueDeletionCallback implements PostQueueDeletionCallback { - - @Override - public void callback(SimpleString address, SimpleString queueName) throws Exception { - Queue queue = server.locateQueue(address); - Collection bindings = server.getPostOffice().getBindingsForAddress(address).getBindings(); - - AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); - - if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoDeleteJmsTopics() && bindings.size() == 1 && queue != null && queue.isAutoCreated()) { - try { - destroyTopic(address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length())); - } catch (IllegalStateException e) { - /* - * During shutdown the callback can be invoked after the JMSServerManager is already shut down so we just - * ignore the exception in that case - */ - if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQJMSServerLogger.LOGGER.debug("Failed to destroy topic", e); - } - } - } - } - } +// class JMSPostQueueDeletionCallback implements PostQueueDeletionCallback { +// +// @Override +// public void callback(SimpleString address, SimpleString queueName) throws Exception { +// Queue queue = server.locateQueue(address); +// Collection bindings = server.getPostOffice().getBindingsForAddress(address).getBindings(); +// +// AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); +// +// if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoDeleteJmsTopics() && bindings.size() == 1 && queue != null && queue.isAutoCreated()) { +// try { +// destroyTopic(address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length())); +// } catch (IllegalStateException e) { +// /* +// * During shutdown the callback can be invoked after the JMSServerManager is already shut down so we just +// * ignore the exception in that case +// */ +// if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) { +// ActiveMQJMSServerLogger.LOGGER.debug("Failed to destroy topic", e); +// } +// } +// } +// } +// } private final class JMSReloader implements ReloadCallback { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java index 0c56e24..2b3f7a2 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java @@ -89,14 +89,14 @@ public class JMSManagementServiceImpl implements JMSManagementService { ObjectName objectName = managementService.getObjectNameBuilder().getJMSQueueObjectName(queue.getQueueName()); JMSQueueControlImpl control = new JMSQueueControlImpl(queue, coreQueueControl, jmsServerManager, counter); managementService.registerInJMX(objectName, control); - managementService.registerInRegistry(ResourceNames.JMS_QUEUE + queue.getQueueName(), control); + managementService.registerInRegistry(queue.getQueueName(), control); } @Override public synchronized void unregisterQueue(final String name) throws Exception { ObjectName objectName = managementService.getObjectNameBuilder().getJMSQueueObjectName(name); managementService.unregisterFromJMX(objectName); - managementService.unregisterFromRegistry(ResourceNames.JMS_QUEUE + name); + managementService.unregisterFromRegistry(name); } @Override @@ -105,14 +105,14 @@ public class JMSManagementServiceImpl implements JMSManagementService { AddressControl addressControl = (AddressControl) managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress()); JMSTopicControlImpl control = new JMSTopicControlImpl(topic, jmsServerManager, addressControl, managementService); managementService.registerInJMX(objectName, control); - managementService.registerInRegistry(ResourceNames.JMS_TOPIC + topic.getTopicName(), control); + managementService.registerInRegistry(topic.getTopicName(), control); } @Override public synchronized void unregisterTopic(final String name) throws Exception { ObjectName objectName = managementService.getObjectNameBuilder().getJMSTopicObjectName(name); managementService.unregisterFromJMX(objectName); - managementService.unregisterFromRegistry(ResourceNames.JMS_TOPIC + name); + managementService.unregisterFromRegistry(name); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceMultipleFileConfigurationTest.java ---------------------------------------------------------------------- diff --git a/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceMultipleFileConfigurationTest.java b/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceMultipleFileConfigurationTest.java index ceb06e8..7e3b313 100644 --- a/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceMultipleFileConfigurationTest.java +++ b/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceMultipleFileConfigurationTest.java @@ -76,7 +76,7 @@ public class EmbeddedJMSResourceMultipleFileConfigurationTest { List boundQueues = jmsServer.getTopicQueues(TEST_TOPIC); assertNotNull("List should never be null", boundQueues); - assertEquals("Should have two queues bound to topic " + TEST_TOPIC, 2, boundQueues.size()); + assertEquals("Should have two queues bound to topic " + TEST_TOPIC, 1, boundQueues.size()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceSingleFileConfigurationTest.java ---------------------------------------------------------------------- diff --git a/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceSingleFileConfigurationTest.java b/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceSingleFileConfigurationTest.java index 5ca3560..9651a7a 100644 --- a/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceSingleFileConfigurationTest.java +++ b/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceSingleFileConfigurationTest.java @@ -76,7 +76,7 @@ public class EmbeddedJMSResourceSingleFileConfigurationTest { List boundQueues = jmsServer.getTopicQueues(TEST_TOPIC); assertNotNull("List should never be null", boundQueues); - assertEquals("Should have two queues bound to topic " + TEST_TOPIC, 2, boundQueues.size()); + assertEquals("Should have two queues bound to topic " + TEST_TOPIC, 1, boundQueues.size()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index f5b6c78..9b84dc1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -58,7 +58,8 @@ public class ProtonProtocolManager implements ProtocolManager, Noti * used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for * the address. This can be changed on the acceptor. * */ - private String pubSubPrefix = ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX; + // TODO fix this + private String pubSubPrefix = ActiveMQDestination.TOPIC_QUALIFIED_PREFIX; private int maxFrameSize = AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java index 96ce90e..6beee36 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java @@ -248,7 +248,7 @@ public class TestConversions extends Assert { } private void simulatePersistence(ServerMessage serverMessage) { - serverMessage.setAddress(new SimpleString("jms.queue.SomeAddress")); + serverMessage.setAddress(new SimpleString("SomeAddress")); // This is just to simulate what would happen during the persistence of the message // We need to still be able to recover the message when we read it back ((EncodingSupport) serverMessage).encode(new EmptyBuffer()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index e53b962..5603cb8 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; @@ -84,9 +85,9 @@ public class AMQConsumer { if (openwireDestination.isTopic()) { if (openwireDestination.isTemporary()) { - address = new SimpleString("jms.temptopic." + physicalName); + address = new SimpleString(physicalName); } else { - address = new SimpleString("jms.topic." + physicalName); + address = new SimpleString(physicalName); } SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address); @@ -95,7 +96,11 @@ public class AMQConsumer { serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); } else { SimpleString queueName = OpenWireUtil.toCoreAddress(openwireDestination); - session.getCoreServer().getJMSDestinationCreator().create(queueName); + try { + session.getCoreServer().createQueue(queueName, queueName, null, true, false); + } catch (ActiveMQQueueExistsException e) { + // ignore + } serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 426f4e6..5cab686 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; @@ -145,7 +146,11 @@ public class AMQSession implements SessionCallback { for (ActiveMQDestination openWireDest : dests) { if (openWireDest.isQueue()) { SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest); - getCoreServer().getJMSDestinationCreator().create(queueName); + try { + getCoreServer().createQueue(queueName, queueName, null, true, false); + } catch (ActiveMQQueueExistsException e) { + // ignore + } } AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java index 05e1e34..a6e7292 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java @@ -28,11 +28,6 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.util.ByteSequence; -import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX; -import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX; -import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX; -import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX; - public class OpenWireUtil { public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) { @@ -45,15 +40,15 @@ public class OpenWireUtil { public static SimpleString toCoreAddress(ActiveMQDestination dest) { if (dest.isQueue()) { if (dest.isTemporary()) { - return new SimpleString(JMS_TEMP_QUEUE_ADDRESS_PREFIX + dest.getPhysicalName()); + return new SimpleString(dest.getPhysicalName()); } else { - return new SimpleString(JMS_QUEUE_ADDRESS_PREFIX + dest.getPhysicalName()); + return new SimpleString(dest.getPhysicalName()); } } else { if (dest.isTemporary()) { - return new SimpleString(JMS_TEMP_TOPIC_ADDRESS_PREFIX + dest.getPhysicalName()); + return new SimpleString(dest.getPhysicalName()); } else { - return new SimpleString(JMS_TOPIC_ADDRESS_PREFIX + dest.getPhysicalName()); + return new SimpleString(dest.getPhysicalName()); } } } @@ -66,7 +61,7 @@ public class OpenWireUtil { */ public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) { String address = message.getAddress().toString(); - String strippedAddress = address.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, ""); + String strippedAddress = address;//.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, ""); if (actualDestination.isQueue()) { return new ActiveMQQueue(strippedAddress); } else { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index a6ddf68..74d03d1 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10; @@ -37,8 +38,8 @@ import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.QueueCreator; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; @@ -252,11 +253,12 @@ public final class StompConnection implements RemotingConnection { } public void autoCreateDestinationIfPossible(String queue) throws ActiveMQStompException { + // TODO: STOMP clients will have to prefix their destination with queue:// or topic:// so we can determine what to do here try { - QueueCreator queueCreator = manager.getServer().getJMSDestinationCreator(); - if (queueCreator != null) { - queueCreator.create(SimpleString.toSimpleString(queue)); - } + manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setRoutingType(AddressInfo.RoutingType.ANYCAST)); + manager.getServer().createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, true, false); + } catch (ActiveMQQueueExistsException e) { + // ignore } catch (Exception e) { throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index f86dd92..d207544 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -285,7 +286,7 @@ public class StompSession implements SessionCallback { receiveCredits = -1; } - if (destination.startsWith("jms.topic")) { + if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingType().equals(AddressInfo.RoutingType.MULTICAST)) { // subscribes to a topic pubSub = true; if (durableSubscriptionName != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/integration/EmbeddedRestActiveMQJMSTest.java ---------------------------------------------------------------------- diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/integration/EmbeddedRestActiveMQJMSTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/integration/EmbeddedRestActiveMQJMSTest.java index 9e10ef7..f012020 100644 --- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/integration/EmbeddedRestActiveMQJMSTest.java +++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/integration/EmbeddedRestActiveMQJMSTest.java @@ -65,7 +65,7 @@ public class EmbeddedRestActiveMQJMSTest { List connectors = createInVmConnector(); server.getEmbeddedJMS().getJMSServerManager().createConnectionFactory("ConnectionFactory", false, JMSFactoryType.CF, connectors, "ConnectionFactory"); - ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/jms.queue.exampleQueue")); + ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/exampleQueue")); ClientResponse response = request.head(); response.releaseConnection(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/EmbeddedTest.java ---------------------------------------------------------------------- diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/EmbeddedTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/EmbeddedTest.java index dea9c0e..dc0ea0f 100644 --- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/EmbeddedTest.java +++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/EmbeddedTest.java @@ -94,7 +94,7 @@ public class EmbeddedTest { @Test public void testTransform() throws Exception { - ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/jms.queue.exampleQueue")); + ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/exampleQueue")); ClientResponse response = request.head(); response.releaseConnection(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/JMSTest.java ---------------------------------------------------------------------- diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/JMSTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/JMSTest.java index c3228ad..77d88d1 100644 --- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/JMSTest.java +++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/JMSTest.java @@ -106,7 +106,7 @@ public class JMSTest extends MessageTestBase { } public static Destination createDestination(String dest) { - ActiveMQDestination destination = (ActiveMQDestination) ActiveMQDestination.fromAddress(dest); + ActiveMQDestination destination = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(dest); System.out.println("SimpleAddress: " + destination.getSimpleAddress()); return destination; } @@ -150,8 +150,9 @@ public class JMSTest extends MessageTestBase { @Test public void testJmsConsumer() throws Exception { - String queueName = ActiveMQDestination.createQueueAddressFromName("testQueue2").toString(); - System.out.println("Queue name: " + queueName); + String queueName = "testQueue2"; + String prefixedQueueName = ActiveMQDestination.createQueueAddressFromName(queueName).toString(); + System.out.println("Queue name: " + prefixedQueueName); QueueDeployment deployment = new QueueDeployment(); deployment.setDuplicatesAllowed(true); deployment.setDurableSend(false); @@ -160,7 +161,7 @@ public class JMSTest extends MessageTestBase { Connection conn = connectionFactory.createConnection(); try { Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = createDestination(queueName); + Destination destination = createDestination(prefixedQueueName); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new Listener()); conn.start(); @@ -196,8 +197,9 @@ public class JMSTest extends MessageTestBase { @Test public void testJmsProducer() throws Exception { - String queueName = ActiveMQDestination.createQueueAddressFromName("testQueue").toString(); - System.out.println("Queue name: " + queueName); + String queueName = "testQueue"; + String prefixedQueueName = ActiveMQDestination.createQueueAddressFromName(queueName).toString(); + System.out.println("Queue name: " + prefixedQueueName); QueueDeployment deployment = new QueueDeployment(); deployment.setDuplicatesAllowed(true); deployment.setDurableSend(false); @@ -221,7 +223,7 @@ public class JMSTest extends MessageTestBase { Order order = new Order(); order.setName("1"); order.setAmount("$5.00"); - publish(queueName, order, null); + publish(prefixedQueueName, order, null); ClientResponse res = consumeNext.request().header("Accept-Wait", "2").accept("application/xml").post(String.class); Assert.assertEquals(200, res.getStatus()); @@ -238,7 +240,7 @@ public class JMSTest extends MessageTestBase { Order order = new Order(); order.setName("1"); order.setAmount("$5.00"); - publish(queueName, order, null); + publish(prefixedQueueName, order, null); ClientResponse res = consumeNext.request().header("Accept-Wait", "2").accept("application/json").post(String.class); Assert.assertEquals(200, res.getStatus()); @@ -255,7 +257,7 @@ public class JMSTest extends MessageTestBase { Order order = new Order(); order.setName("2"); order.setAmount("$15.00"); - publish(queueName, order, "application/xml"); + publish(prefixedQueueName, order, "application/xml"); ClientResponse res = consumeNext.request().header("Accept-Wait", "2").post(String.class); Assert.assertEquals(200, res.getStatus()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/SelectorTest.java ---------------------------------------------------------------------- diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/SelectorTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/SelectorTest.java index 176d61e..1491f51 100644 --- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/SelectorTest.java +++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/SelectorTest.java @@ -45,12 +45,13 @@ import static org.jboss.resteasy.test.TestPortProvider.generateURL; public class SelectorTest extends MessageTestBase { public static ConnectionFactory connectionFactory; - public static String topicName = ActiveMQDestination.createQueueAddressFromName("testTopic").toString(); + public static String topicName = "testTopic"; + public static String prefixedTopicName = ActiveMQDestination.createQueueAddressFromName(topicName).toString(); @BeforeClass public static void setup() throws Exception { connectionFactory = new ActiveMQJMSConnectionFactory(manager.getQueueManager().getServerLocator()); - System.out.println("Queue name: " + topicName); + System.out.println("Queue name: " + prefixedTopicName); TopicDeployment deployment = new TopicDeployment(); deployment.setDuplicatesAllowed(true); deployment.setDurableSend(false); @@ -118,7 +119,7 @@ public class SelectorTest extends MessageTestBase { } public static Destination createDestination(String dest) { - ActiveMQDestination destination = (ActiveMQDestination) ActiveMQDestination.fromAddress(dest); + ActiveMQDestination destination = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(dest); System.out.println("SimpleAddress: " + destination.getSimpleAddress()); return destination; } @@ -203,32 +204,32 @@ public class SelectorTest extends MessageTestBase { Order order = new Order(); order.setName("1"); order.setAmount("$5.00"); - publish(topicName, order, null, "1"); + publish(prefixedTopicName, order, null, "1"); Thread.sleep(200); Assert.assertEquals(order, PushReceiver.oneOrder); order.setName("2"); - publish(topicName, order, null, "2"); + publish(prefixedTopicName, order, null, "2"); Thread.sleep(200); Assert.assertEquals(order, PushReceiver.twoOrder); order.setName("3"); - publish(topicName, order, null, "2"); + publish(prefixedTopicName, order, null, "2"); Thread.sleep(200); Assert.assertEquals(order, PushReceiver.twoOrder); order.setName("4"); - publish(topicName, order, null, "1"); + publish(prefixedTopicName, order, null, "1"); Thread.sleep(200); Assert.assertEquals(order, PushReceiver.oneOrder); order.setName("5"); - publish(topicName, order, null, "1"); + publish(prefixedTopicName, order, null, "1"); Thread.sleep(200); Assert.assertEquals(order, PushReceiver.oneOrder); order.setName("6"); - publish(topicName, order, null, "1"); + publish(prefixedTopicName, order, null, "1"); Thread.sleep(200); Assert.assertEquals(order, PushReceiver.oneOrder); @@ -262,17 +263,17 @@ public class SelectorTest extends MessageTestBase { Order order = new Order(); order.setName("1"); order.setAmount("$5.00"); - publish(topicName, order, null, "1"); + publish(prefixedTopicName, order, null, "1"); order.setName("2"); - publish(topicName, order, null, "2"); + publish(prefixedTopicName, order, null, "2"); order.setName("3"); - publish(topicName, order, null, "2"); + publish(prefixedTopicName, order, null, "2"); order.setName("4"); - publish(topicName, order, null, "1"); + publish(prefixedTopicName, order, null, "1"); order.setName("5"); - publish(topicName, order, null, "1"); + publish(prefixedTopicName, order, null, "1"); order.setName("6"); - publish(topicName, order, null, "1"); + publish(prefixedTopicName, order, null, "1"); { order.setName("1"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/XmlTest.java ---------------------------------------------------------------------- diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/XmlTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/XmlTest.java index 2c0bd9d..726e16e 100644 --- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/XmlTest.java +++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/XmlTest.java @@ -27,7 +27,7 @@ public class XmlTest { @Test public void testPush() throws Exception { String xml = "\n" + - " jms.queue.bar\n" + + " bar\n" + " true\n" + " 10\n" + " \n" + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-rest/src/test/resources/broker.xml ---------------------------------------------------------------------- diff --git a/artemis-rest/src/test/resources/broker.xml b/artemis-rest/src/test/resources/broker.xml index 2993d98..4d76412 100644 --- a/artemis-rest/src/test/resources/broker.xml +++ b/artemis-rest/src/test/resources/broker.xml @@ -39,7 +39,7 @@ - + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index fcbf15c..9140fe4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -1505,15 +1505,29 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active final String deleteNonDurableQueueRoles, final String manageRoles, final String browseRoles) throws Exception { + addSecuritySettings(addressMatch, sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles, ""); + } + + @Override + public void addSecuritySettings(final String addressMatch, + final String sendRoles, + final String consumeRoles, + final String createDurableQueueRoles, + final String deleteDurableQueueRoles, + final String createNonDurableQueueRoles, + final String deleteNonDurableQueueRoles, + final String manageRoles, + final String browseRoles, + final String createAddressRoles) throws Exception { checkStarted(); clearIO(); try { - Set roles = SecurityFormatter.createSecurity(sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles); + Set roles = SecurityFormatter.createSecurity(sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles, createAddressRoles); server.getSecurityRepository().addMatch(addressMatch, roles); - PersistedRoles persistedRoles = new PersistedRoles(addressMatch, sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles); + PersistedRoles persistedRoles = new PersistedRoles(addressMatch, sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles, createAddressRoles); storageManager.storeSecurityRoles(persistedRoles); } finally { @@ -1588,7 +1602,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active if (addressSettings.getExpiryAddress() != null) { settings.add("expiryAddress", addressSettings.getExpiryAddress().toString()); } - return settings.add("expiryDelay", addressSettings.getExpiryDelay()).add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts()).add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize()).add("maxSizeBytes", addressSettings.getMaxSizeBytes()).add("pageSizeBytes", addressSettings.getPageSizeBytes()).add("redeliveryDelay", addressSettings.getRedeliveryDelay()).add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier()).add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay()).add("redistributionDelay", addressSettings.getRedistributionDelay()).add("lastValueQueue", addressSettings.isLastValueQueue()).add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute()).add("addressFullMessagePolicy", policy).add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()).add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()).add("slowConsumerPolicy", consumerPolicy).add("autoCreateJmsQueues", addressSettings.isAutoCreat eJmsQueues()).add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues()).add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics()).add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsTopics()).build().toString(); + return settings.add("expiryDelay", addressSettings.getExpiryDelay()).add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts()).add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize()).add("maxSizeBytes", addressSettings.getMaxSizeBytes()).add("pageSizeBytes", addressSettings.getPageSizeBytes()).add("redeliveryDelay", addressSettings.getRedeliveryDelay()).add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier()).add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay()).add("redistributionDelay", addressSettings.getRedistributionDelay()).add("lastValueQueue", addressSettings.isLastValueQueue()).add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute()).add("addressFullMessagePolicy", policy).add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()).add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()).add("slowConsumerPolicy", consumerPolicy).add("autoCreateJmsQueues", addressSettings.isAutoCreat eJmsQueues()).add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics()).add("autoDeleteJmsQueues", addressSettings.getAutoDeleteJmsQueues()).add("autoDeleteJmsTopics", addressSettings.getAutoDeleteJmsQueues()).build().toString(); } @Override @@ -1661,8 +1675,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } addressSettings.setAutoCreateJmsQueues(autoCreateJmsQueues); addressSettings.setAutoDeleteJmsQueues(autoDeleteJmsQueues); - addressSettings.setAutoCreateJmsTopics(autoCreateJmsTopics); - addressSettings.setAutoDeleteJmsTopics(autoDeleteJmsTopics); server.getAddressSettingsRepository().addMatch(address, addressSettings); storageManager.storeAddressSetting(new PersistedAddressSetting(new SimpleString(address), addressSettings)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java index 838be12..2240ccd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java @@ -27,4 +27,6 @@ public interface AddressBindingInfo { AddressInfo.RoutingType getRoutingType(); + int getDefaultMaxConsumers(); + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java index 4d435c6..3a0c240 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java @@ -52,5 +52,5 @@ public interface QueueBindingInfo { boolean isDeleteOnNoConsumers(); - void setDeleteOnNoConsumers(); + void setDeleteOnNoConsumers(boolean deleteOnNoConsumers); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedRoles.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedRoles.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedRoles.java index 383a75f..ffa0dbb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedRoles.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedRoles.java @@ -46,6 +46,8 @@ public class PersistedRoles implements EncodingSupport { private SimpleString browseRoles; + private SimpleString createAddressRoles; + // Static -------------------------------------------------------- // Constructors -------------------------------------------------- @@ -72,7 +74,8 @@ public class PersistedRoles implements EncodingSupport { final String createNonDurableQueueRoles, final String deleteNonDurableQueueRoles, final String manageRoles, - final String browseRoles) { + final String browseRoles, + final String createAddressRoles) { super(); this.addressMatch = SimpleString.toSimpleString(addressMatch); this.sendRoles = SimpleString.toSimpleString(sendRoles); @@ -83,6 +86,7 @@ public class PersistedRoles implements EncodingSupport { this.deleteNonDurableQueueRoles = SimpleString.toSimpleString(deleteNonDurableQueueRoles); this.manageRoles = SimpleString.toSimpleString(manageRoles); this.browseRoles = SimpleString.toSimpleString(browseRoles); + this.createAddressRoles = SimpleString.toSimpleString(createAddressRoles); } // Public -------------------------------------------------------- @@ -158,6 +162,13 @@ public class PersistedRoles implements EncodingSupport { return browseRoles.toString(); } + /** + * @return the createAddressRoles + */ + public String getCreateAddressRoles() { + return createAddressRoles.toString(); + } + @Override public void encode(final ActiveMQBuffer buffer) { buffer.writeSimpleString(addressMatch); @@ -169,6 +180,7 @@ public class PersistedRoles implements EncodingSupport { buffer.writeNullableSimpleString(deleteNonDurableQueueRoles); buffer.writeNullableSimpleString(manageRoles); buffer.writeNullableSimpleString(browseRoles); + buffer.writeNullableSimpleString(createAddressRoles); } @Override @@ -180,7 +192,8 @@ public class PersistedRoles implements EncodingSupport { SimpleString.sizeofNullableString(createNonDurableQueueRoles) + SimpleString.sizeofNullableString(deleteNonDurableQueueRoles) + SimpleString.sizeofNullableString(manageRoles) + - SimpleString.sizeofNullableString(browseRoles); + SimpleString.sizeofNullableString(browseRoles) + + SimpleString.sizeofNullableString(createAddressRoles); } @@ -195,6 +208,7 @@ public class PersistedRoles implements EncodingSupport { deleteNonDurableQueueRoles = buffer.readNullableSimpleString(); manageRoles = buffer.readNullableSimpleString(); browseRoles = buffer.readNullableSimpleString(); + createAddressRoles = buffer.readNullableSimpleString(); } /* (non-Javadoc) @@ -212,6 +226,7 @@ public class PersistedRoles implements EncodingSupport { result = prime * result + ((deleteNonDurableQueueRoles == null) ? 0 : deleteNonDurableQueueRoles.hashCode()); result = prime * result + ((manageRoles == null) ? 0 : manageRoles.hashCode()); result = prime * result + ((browseRoles == null) ? 0 : browseRoles.hashCode()); + result = prime * result + ((createAddressRoles == null) ? 0 : createAddressRoles.hashCode()); result = prime * result + ((sendRoles == null) ? 0 : sendRoles.hashCode()); result = prime * result + (int) (storeId ^ (storeId >>> 32)); return result; @@ -269,6 +284,11 @@ public class PersistedRoles implements EncodingSupport { return false; } else if (!browseRoles.equals(other.browseRoles)) return false; + if (createAddressRoles == null) { + if (other.createAddressRoles != null) + return false; + } else if (!createAddressRoles.equals(other.createAddressRoles)) + return false; if (sendRoles == null) { if (other.sendRoles != null) return false; @@ -303,6 +323,8 @@ public class PersistedRoles implements EncodingSupport { manageRoles + ", browseRoles=" + browseRoles + + ", createAddressRoles=" + + createAddressRoles + "]"; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index b9e91ec..16ecdf3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1221,7 +1221,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { SimpleString filterString = filter == null ? null : filter.getFilterString(); - PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated()); + PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isDeleteOnNoConsumers()); readLock(); try { @@ -1268,7 +1268,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { @Override public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception { - PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingType()); + PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingType(), addressInfo.getDefaultMaxQueueConsumers()); readLock(); try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java index 7ef7e4d..3821b34 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java @@ -29,6 +29,8 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres public SimpleString name; + public int defaultMaxConsumers; + public AddressInfo.RoutingType routingType; public PersistentAddressBindingEncoding() { @@ -41,13 +43,17 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres name + ", routingType=" + routingType + + ", defaultMaxConsumers=" + + defaultMaxConsumers + "]"; } public PersistentAddressBindingEncoding(final SimpleString name, - final AddressInfo.RoutingType routingType) { + final AddressInfo.RoutingType routingType, + final int defaultMaxConsumers) { this.name = name; this.routingType = routingType; + this.defaultMaxConsumers = defaultMaxConsumers; } @Override @@ -70,19 +76,26 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres } @Override + public int getDefaultMaxConsumers() { + return defaultMaxConsumers; + } + + @Override public void decode(final ActiveMQBuffer buffer) { name = buffer.readSimpleString(); routingType = AddressInfo.RoutingType.getType(buffer.readByte()); + defaultMaxConsumers = buffer.readInt(); } @Override public void encode(final ActiveMQBuffer buffer) { buffer.writeSimpleString(name); buffer.writeByte(routingType.getType()); + buffer.writeInt(defaultMaxConsumers); } @Override public int getEncodeSize() { - return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE; + return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java index 169cd7d..88bc1cf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java @@ -72,12 +72,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin final SimpleString address, final SimpleString filterString, final SimpleString user, - final boolean autoCreated) { + final boolean autoCreated, + final int maxConsumers, + final boolean deleteOnNoConsumers) { this.name = name; this.address = address; this.filterString = filterString; this.user = user; this.autoCreated = autoCreated; + this.maxConsumers = maxConsumers; + this.deleteOnNoConsumers = deleteOnNoConsumers; } @Override @@ -134,12 +138,12 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin @Override public int getMaxConsumers() { - return 0; + return maxConsumers; } @Override public void setMaxConsumers(int maxConsumers) { - + this.maxConsumers = maxConsumers; } @Override @@ -148,8 +152,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin } @Override - public void setDeleteOnNoConsumers() { - + public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) { + this.deleteOnNoConsumers = deleteOnNoConsumers; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 6c654bf..4c51373 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -421,11 +421,21 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public AddressInfo addAddressInfo(AddressInfo addressInfo) { + try { + getServer().getManagementService().registerAddress(addressInfo.getName()); + } catch (Exception e) { + e.printStackTrace(); + } return addressManager.addAddressInfo(addressInfo); } @Override public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) { + try { + getServer().getManagementService().registerAddress(addressInfo.getName()); + } catch (Exception e) { + e.printStackTrace(); + } return addressManager.addOrUpdateAddressInfo(addressInfo); } @@ -490,6 +500,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding throw new ActiveMQNonExistentQueueException(); } + // TODO: see whether we still want to do this or not if (deleteData && addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) { pagingManager.deletePageStore(binding.getAddress()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index 969a1a9..6ed2564 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.postoffice.Address; import org.apache.activemq.artemis.core.postoffice.AddressManager; import org.apache.activemq.artemis.core.postoffice.Binding; @@ -39,7 +38,7 @@ import org.jboss.logging.Logger; */ public class SimpleAddressManager implements AddressManager { - private static final Logger logger = Logger.getLogger(Page.class); + private static final Logger logger = Logger.getLogger(SimpleAddressManager.class); private final ConcurrentMap addressInfoMap = new ConcurrentHashMap<>(); @@ -196,7 +195,7 @@ public class SimpleAddressManager implements AddressManager { private AddressInfo updateAddressInfo(AddressInfo from, AddressInfo to) { synchronized (from) { from.setRoutingType(to.getRoutingType()); - from.setDefaultMaxConsumers(to.getDefaultMaxConsumers()); + from.setDefaultMaxQueueConsumers(to.getDefaultMaxQueueConsumers()); from.setDefaultDeleteOnNoConsumers(to.isDefaultDeleteOnNoConsumers()); return from; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 2a45f29..be71a92 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; @@ -82,6 +83,7 @@ import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.jboss.logging.Logger; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE; @@ -220,6 +222,15 @@ public class ServerSessionPacketHandler implements ChannelHandler { break; } + case CREATE_ADDRESS: { + CreateAddressMessage request = (CreateAddressMessage) packet; + requiresResponse = request.isRequiresResponse(); + session.createAddress(request.getAddress(), request.isMulticast()); + if (requiresResponse) { + response = new NullResponseMessage(); + } + break; + } case CREATE_QUEUE: { CreateQueueMessage request = (CreateQueueMessage) packet; requiresResponse = request.isRequiresResponse(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java index 7d4cc00..abea943 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java @@ -29,6 +29,12 @@ public enum CheckType { return role.isConsume(); } }, + CREATE_ADDRESS { + @Override + public boolean hasRole(final Role role) { + return role.isCreateAddress(); + } + }, CREATE_DURABLE_QUEUE { @Override public boolean hasRole(final Role role) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 9b5578c..51e1830 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -465,7 +465,7 @@ public interface ActiveMQServer extends ActiveMQComponent { void removeClientConnection(String clientId); - AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo); + AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception; - AddressInfo removeAddressInfo(SimpleString address); + AddressInfo removeAddressInfo(SimpleString address) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java index 3b7ed71..81834be 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java @@ -33,7 +33,7 @@ public final class QueueConfig { private final boolean durable; private final boolean temporary; private final boolean autoCreated; - private final int maxConsumers; + private final Integer maxConsumers; private final boolean deleteOnNoConsumers; public static final class Builder { @@ -47,7 +47,7 @@ public final class QueueConfig { private boolean durable; private boolean temporary; private boolean autoCreated; - private int maxConsumers; + private Integer maxConsumers; private boolean deleteOnNoConsumers; private Builder(final long id, final SimpleString name) { @@ -112,7 +112,7 @@ public final class QueueConfig { return this; } - public Builder maxConsumers(final int maxConsumers) { + public Builder maxConsumers(final Integer maxConsumers) { this.maxConsumers = maxConsumers; return this; } @@ -185,7 +185,7 @@ public final class QueueConfig { final boolean durable, final boolean temporary, final boolean autoCreated, - final int maxConsumers, + final Integer maxConsumers, final boolean deleteOnNoConsumers) { this.id = id; this.address = address; @@ -240,7 +240,7 @@ public final class QueueConfig { return deleteOnNoConsumers; } - public int maxConsumers() { + public Integer maxConsumers() { return maxConsumers; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index ab3898c..910eb22 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.security.SecurityAuth; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -109,6 +110,8 @@ public interface ServerSession extends SecurityAuth { boolean temporary, boolean durable) throws Exception; + AddressInfo createAddress(final SimpleString address, final boolean multicast) throws Exception; + void deleteQueue(SimpleString name) throws Exception; ServerConsumer createConsumer(long consumerID, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3ac7a0c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index ac30c53..423127a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -78,10 +78,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled // Attributes ---------------------------------------------------- - private static final SimpleString JMS_QUEUE_ADDRESS_PREFIX = new SimpleString("jms.queue."); - - private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new SimpleString("jms.topic."); - protected final ServerLocatorInternal serverLocator; protected final Executor executor; @@ -879,16 +875,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled return; } - if (forwardingAddress.startsWith(BridgeImpl.JMS_QUEUE_ADDRESS_PREFIX) || forwardingAddress.startsWith(BridgeImpl.JMS_TOPIC_ADDRESS_PREFIX)) { - if (!query.isExists()) { - ActiveMQServerLogger.LOGGER.errorQueryingBridge(forwardingAddress, retryCount); - scheduleRetryConnect(); - return; - } - } else { - if (!query.isExists()) { - ActiveMQServerLogger.LOGGER.bridgeNoBindings(getName(), getForwardingAddress(), getForwardingAddress()); - } + if (!query.isExists()) { + ActiveMQServerLogger.LOGGER.errorQueryingBridge(forwardingAddress, retryCount); + scheduleRetryConnect(); + return; } }