Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D2107195F8 for ; Wed, 16 Mar 2016 01:53:29 +0000 (UTC) Received: (qmail 68268 invoked by uid 500); 16 Mar 2016 01:53:27 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 68221 invoked by uid 500); 16 Mar 2016 01:53:27 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 66393 invoked by uid 99); 16 Mar 2016 01:53:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2016 01:53:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E953FE00A1; Wed, 16 Mar 2016 01:53:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 16 Mar 2016 01:54:21 -0000 Message-Id: <028b448b282b464d8a131a009545da28@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [57/60] [abbrv] activemq-artemis git commit: more refactorings on producers more refactorings on producers Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dfb047b6 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dfb047b6 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dfb047b6 Branch: refs/heads/refactor-openwire Commit: dfb047b62dcb58de6466fb29da5444e0ba140de1 Parents: e7f743b Author: Clebert Suconic Authored: Thu Feb 25 18:10:18 2016 -0500 Committer: Clebert Suconic Committed: Tue Mar 15 20:45:29 2016 -0400 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 190 +++++++++---------- .../core/protocol/openwire/OpenWireUtil.java | 23 +-- .../artemis/core/server/ActiveMQServer.java | 4 + .../core/server/impl/ActiveMQServerImpl.java | 71 +++++++ .../core/server/impl/ServerSessionImpl.java | 56 +----- .../InvestigationOpenwireTest.java | 17 +- 6 files changed, 181 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dfb047b6/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index dc2a8a6..6839259 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -51,7 +51,9 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerB import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.SecurityAuth; +import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; @@ -146,6 +148,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private String defaultSocketURIString; + // TODO-NOW: check on why there are two connections created for every createConnection on the client. public OpenWireConnection(Connection connection, Executor executor, OpenWireProtocolManager openWireProtocolManager, @@ -267,13 +270,25 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } - catch (IOException e) { + catch (Exception e) { + ActiveMQServerLogger.LOGGER.debug(e); - // TODO-NOW: send errors - ActiveMQServerLogger.LOGGER.error("error decoding", e); - } - catch (Throwable t) { - ActiveMQServerLogger.LOGGER.error("error decoding", t); + Response resp; + if (e instanceof ActiveMQSecurityException) { + resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); + } + else if (e instanceof ActiveMQNonExistentQueueException) { + resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage())); + } + else { + resp = new ExceptionResponse(e); + } + try { + dispatch(resp); + } + catch (IOException e2) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2); + } } } @@ -861,6 +876,22 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } + /** + * Checks to see if this destination exists. If it does not throw an invalid destination exception. + * + * @param destination + */ + private void validateDestination(ActiveMQDestination destination) throws Exception { + if (destination.isQueue()) { + SimpleString physicalName = OpenWireUtil.toCoreAddress(destination); + BindingQueryResult result = protocolManager.getServer().bindingQuery(physicalName); + if (!result.isExists() && !result.isAutoCreateJmsQueues()) { + throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName); + } + } + } + + // This will listen for commands throught the protocolmanager public class CommandProcessor implements CommandVisitor { @@ -892,69 +923,40 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processAddProducer(ProducerInfo info) throws Exception { - Response resp = null; - try { - SessionId sessionId = info.getProducerId().getParentId(); - ConnectionId connectionId = sessionId.getParentId(); - ConnectionState cs = getState(); - if (cs == null) { - throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + connectionId); - } - SessionState ss = cs.getSessionState(sessionId); - if (ss == null) { - throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId); - } - // Avoid replaying dup commands - if (!ss.getProducerIds().contains(info.getProducerId())) { + SessionId sessionId = info.getProducerId().getParentId(); + ConnectionState cs = getState(); - AMQSession amqSession = sessions.get(sessionId); - if (amqSession == null) { - throw new IllegalStateException("Session not exist! : " + sessionId); - } + if (cs == null) { + throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + sessionId.getParentId()); + } - ActiveMQDestination destination = info.getDestination(); - if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { - if (destination.isQueue()) { - OpenWireUtil.validateDestination(destination, amqSession); - } - DestinationInfo destInfo = new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); - OpenWireConnection.this.addDestination(destInfo); - } + SessionState ss = cs.getSessionState(sessionId); + if (ss == null) { + throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId); + } - ss.addProducer(info); + // Avoid replaying dup commands + if (!ss.getProducerIds().contains(info.getProducerId())) { + ActiveMQDestination destination = info.getDestination(); + if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { + if (destination.isQueue()) { + OpenWireConnection.this.validateDestination(destination); + } + DestinationInfo destInfo = new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); + OpenWireConnection.this.addDestination(destInfo); } + + ss.addProducer(info); + } - catch (Exception e) { - if (e instanceof ActiveMQSecurityException) { - resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); - } - else if (e instanceof ActiveMQNonExistentQueueException) { - resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage())); - } - else { - resp = new ExceptionResponse(e); - } - } - return resp; + return null; } @Override - public Response processAddConsumer(ConsumerInfo info) { - Response resp = null; - try { - addConsumer(info); - } - catch (Exception e) { - e.printStackTrace(); - if (e instanceof ActiveMQSecurityException) { - resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); - } - else { - resp = new ExceptionResponse(e); - } - } - return resp; + public Response processAddConsumer(ConsumerInfo info) throws Exception { + addConsumer(info); + return null; } @Override @@ -1146,50 +1148,40 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } @Override - public Response processMessage(Message messageSend) { - Response resp = null; - try { - ProducerId producerId = messageSend.getProducerId(); - AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); - final AMQConnectionContext pcontext = producerExchange.getConnectionContext(); - final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); - boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode(); - - AMQSession session = getSession(producerId.getParentId()); - - SendingResult result = session.send(producerExchange, messageSend, sendProducerAck); - if (result.isBlockNextSend()) { - if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) { - // TODO see logging - throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); - } - - if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) { - //in that case don't send the response - //this will force the client to wait until - //the response is got. - context.setDontSendReponse(true); - } - else { - //hang the connection until the space is available - session.blockingWaitForSpace(producerExchange, result); - } + public Response processMessage(Message messageSend) throws Exception { + ProducerId producerId = messageSend.getProducerId(); + AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); + final AMQConnectionContext pcontext = producerExchange.getConnectionContext(); + final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); + boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode(); + + AMQSession session = getSession(producerId.getParentId()); + + SendingResult result = session.send(producerExchange, messageSend, sendProducerAck); + if (result.isBlockNextSend()) { + if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) { + // TODO see logging + throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); } - else if (sendProducerAck) { - // TODO-now: send through OperationContext - ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); - OpenWireConnection.this.dispatchAsync(ack); - } - } - catch (Throwable e) { - if (e instanceof ActiveMQSecurityException) { - resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); + + if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) { + //in that case don't send the response + //this will force the client to wait until + //the response is got. + context.setDontSendReponse(true); } else { - resp = new ExceptionResponse(e); + //hang the connection until the space is available + session.blockingWaitForSpace(producerExchange, result); } } - return resp; + else if (sendProducerAck) { + // TODO-now: send through OperationContext + ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); + OpenWireConnection.this.dispatchAsync(ack); + } + + return null; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dfb047b6/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java index d684761..4513eb3 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java @@ -18,16 +18,12 @@ package org.apache.activemq.artemis.core.protocol.openwire; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; -import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; -import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.artemis.api.core.SimpleString; public class OpenWireUtil { @@ -64,23 +60,6 @@ public class OpenWireUtil { } } - /** - * Checks to see if this destination exists. If it does not throw an invalid destination exception. - * - * @param destination - * @param amqSession - */ - public static void validateDestination(ActiveMQDestination destination, AMQSession amqSession) throws Exception { - if (destination.isQueue()) { - AMQServerSession coreSession = amqSession.getCoreSession(); - SimpleString physicalName = OpenWireUtil.toCoreAddress(destination); - BindingQueryResult result = coreSession.executeBindingQuery(physicalName); - if (!result.isExists() && !result.isAutoCreateJmsQueues()) { - throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName); - } - } - } - /* *This util converts amq wildcards to compatible core wildcards *The conversion is like this: http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dfb047b6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index e3c1b2a..64633bb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -243,6 +243,10 @@ public interface ActiveMQServer extends ActiveMQComponent { Queue locateQueue(SimpleString queueName); + BindingQueryResult bindingQuery(SimpleString address) throws Exception; + + QueueQueryResult queueQuery(SimpleString name) throws Exception; + void destroyQueue(SimpleString queueName) throws Exception; void destroyQueue(SimpleString queueName, SecurityAuth session) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dfb047b6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 7554127..13a1283 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.Configuration; @@ -76,6 +77,8 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.BindingType; +import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding; @@ -97,6 +100,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Bindable; +import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.LargeServerMessage; @@ -105,6 +109,7 @@ import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueCreator; import org.apache.activemq.artemis.core.server.QueueFactory; +import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSessionFactory; @@ -545,6 +550,72 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override + public BindingQueryResult bindingQuery(SimpleString address) throws Exception { + if (address == null) { + throw ActiveMQMessageBundle.BUNDLE.addressIsNull(); + } + + boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues(); + + List names = new ArrayList<>(); + + // make an exception for the management address (see HORNETQ-29) + ManagementService managementService = getManagementService(); + if (managementService != null) { + if (address.equals(managementService.getManagementAddress())) { + return new BindingQueryResult(true, names, autoCreateJmsQueues); + } + } + + Bindings bindings = getPostOffice().getMatchingBindings(address); + + for (Binding binding : bindings.getBindings()) { + if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) { + names.add(binding.getUniqueName()); + } + } + + return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues); + } + + @Override + public QueueQueryResult queueQuery(SimpleString name) { + if (name == null) { + throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull(); + } + + boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues(); + + QueueQueryResult response; + + Binding binding = getPostOffice().getBinding(name); + + SimpleString managementAddress = getManagementService() != null ? getManagementService().getManagementAddress() : null; + + if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) { + Queue queue = (Queue) binding.getBindable(); + + Filter filter = queue.getFilter(); + + SimpleString filterString = filter == null ? null : filter.getFilterString(); + + response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues); + } + // make an exception for the management address (see HORNETQ-29) + else if (name.equals(managementAddress)) { + response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues); + } + else if (autoCreateJmsQueues) { + response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false); + } + else { + response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false); + } + + return response; + } + + @Override public void threadDump() { StringWriter str = new StringWriter(); PrintWriter out = new PrintWriter(str); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dfb047b6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index d628bde..77705fa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -50,7 +50,6 @@ import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; -import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.remoting.CloseListener; @@ -623,63 +622,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception { - if (name == null) { - throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull(); - } - - boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues(); - - QueueQueryResult response; - - Binding binding = postOffice.getBinding(name); - - if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) { - Queue queue = (Queue) binding.getBindable(); - - Filter filter = queue.getFilter(); - - SimpleString filterString = filter == null ? null : filter.getFilterString(); - - response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues); - } - // make an exception for the management address (see HORNETQ-29) - else if (name.equals(managementAddress)) { - response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues); - } - else if (autoCreateJmsQueues) { - response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false); - } - else { - response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false); - } - - return response; + return server.queueQuery(name); } @Override public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception { - if (address == null) { - throw ActiveMQMessageBundle.BUNDLE.addressIsNull(); - } - - boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues(); - - List names = new ArrayList<>(); - - // make an exception for the management address (see HORNETQ-29) - if (address.equals(managementAddress)) { - return new BindingQueryResult(true, names, autoCreateJmsQueues); - } - - Bindings bindings = postOffice.getMatchingBindings(address); - - for (Binding binding : bindings.getBindings()) { - if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) { - names.add(binding.getUniqueName()); - } - } - - return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues); + return server.bindingQuery(address); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dfb047b6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java index 3614b9a..914a8e1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java @@ -17,15 +17,22 @@ package org.apache.activemq.artemis.tests.integration.openwire.investigations; -import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; -import org.junit.Assert; -import org.junit.Test; - -import javax.jms.*; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.XAConnection; +import javax.jms.XASession; import javax.transaction.xa.XAResource; import java.util.Collection; import java.util.LinkedList; +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Assert; +import org.junit.Test; + public class InvestigationOpenwireTest extends BasicOpenWireTest { @Test