Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CB7B7200C4C for ; Tue, 4 Apr 2017 11:19:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CA163160BA1; Tue, 4 Apr 2017 09:19:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CA86E160B81 for ; Tue, 4 Apr 2017 11:19:17 +0200 (CEST) Received: (qmail 11400 invoked by uid 500); 4 Apr 2017 09:19:17 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 11389 invoked by uid 99); 4 Apr 2017 09:19:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Apr 2017 09:19:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 06ADBDFBA9; Tue, 4 Apr 2017 09:19:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gtully@apache.org To: commits@activemq.apache.org Message-Id: <5a9ad340e4b7436fa86ef5a65b71a1ae@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: [AMQ-6643] ensure a wildcard virtual topic subscriber is restricted to the wildcard destination - avoid duplicate and spurious dispatch. fix and test Date: Tue, 4 Apr 2017 09:19:16 +0000 (UTC) archived-at: Tue, 04 Apr 2017 09:19:19 -0000 Repository: activemq Updated Branches: refs/heads/master 2731f04f1 -> 679db08db [AMQ-6643] ensure a wildcard virtual topic subscriber is restricted to the wildcard destination - avoid duplicate and spurious dispatch. fix and test Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/679db08d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/679db08d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/679db08d Branch: refs/heads/master Commit: 679db08db3dba27475b9e82c20d3dafeb155631f Parents: 2731f04 Author: gtully Authored: Tue Apr 4 10:16:00 2017 +0100 Committer: gtully Committed: Tue Apr 4 10:16:48 2017 +0100 ---------------------------------------------------------------------- .../region/virtual/MappedQueueFilter.java | 54 +++---- .../activemq/command/ActiveMQDestination.java | 6 + .../mqtt/PahoVirtualTopicMQTTTest.java | 11 +- .../virtual/VirtualTopicWildcardTest.java | 149 +++++++++++++++++++ 4 files changed, 184 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/679db08d/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java index e8de910..db02490 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java @@ -48,34 +48,34 @@ public class MappedQueueFilter extends DestinationFilter { // recover messages for first consumer only boolean noSubs = getConsumers().isEmpty(); - if (!sub.getActiveMQDestination().isPattern() || sub.getActiveMQDestination().equals(next.getActiveMQDestination())) { + // for virtual consumer wildcard dests, only subscribe to exact match to ensure no duplicates + if (sub.getActiveMQDestination().compareTo(next.getActiveMQDestination()) == 0) { super.addSubscription(context, sub); - - if (noSubs && !getConsumers().isEmpty()) { - // new subscription added, recover retroactive messages - final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class); - final Set virtualDests = regionBroker.getDestinations(virtualDestination); - - final ActiveMQDestination newDestination = sub.getActiveMQDestination(); - final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]); - - for (Destination virtualDest : virtualDests) { - if (virtualDest.getActiveMQDestination().isTopic() && - (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) { - - Topic topic = (Topic) getBaseDestination(virtualDest); - if (topic != null) { - // re-use browse() to get recovered messages - final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination()); - - // add recovered messages to subscription - for (Message message : messages) { - final Message copy = message.copy(); - copy.setOriginalDestination(message.getDestination()); - copy.setDestination(newDestination); - copy.setRegionDestination(regionDest); - sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy); - } + } + if (noSubs && !getConsumers().isEmpty()) { + // new subscription added, recover retroactive messages + final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class); + final Set virtualDests = regionBroker.getDestinations(virtualDestination); + + final ActiveMQDestination newDestination = sub.getActiveMQDestination(); + final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]); + + for (Destination virtualDest : virtualDests) { + if (virtualDest.getActiveMQDestination().isTopic() && + (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) { + + Topic topic = (Topic) getBaseDestination(virtualDest); + if (topic != null) { + // re-use browse() to get recovered messages + final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination()); + + // add recovered messages to subscription + for (Message message : messages) { + final Message copy = message.copy(); + copy.setOriginalDestination(message.getDestination()); + copy.setDestination(newDestination); + copy.setRegionDestination(regionDest); + sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy); } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/679db08d/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java index 4819a1a..149145d 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java @@ -159,6 +159,12 @@ public abstract class ActiveMQDestination extends JNDIBaseStorable implements Da return 1; } else { if (destination.getDestinationType() == destination2.getDestinationType()) { + + if (destination.isPattern() && destination2.isPattern() ) { + if (destination.getPhysicalName().compareTo(destination2.getPhysicalName()) == 0) { + return 0; + } + } if (destination.isPattern()) { DestinationFilter filter = DestinationFilter.parseFilter(destination); if (filter.matches(destination2)) { http://git-wip-us.apache.org/repos/asf/activemq/blob/679db08d/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java index 5f58202..55103ac 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java @@ -18,7 +18,6 @@ package org.apache.activemq.transport.mqtt; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.command.ActiveMQQueue; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.junit.Before; @@ -72,14 +71,8 @@ public class PahoVirtualTopicMQTTTest extends PahoMQTTTest { RegionBroker regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class); - String[] queues = new String[]{"Consumer.client-10:AT_LEAST_ONCE.VirtualTopic.user10.>", - "Consumer.client-10:AT_LEAST_ONCE.VirtualTopic.user10.client-10.>", - "Consumer.client-1:AT_LEAST_ONCE.VirtualTopic.user1.>", - "Consumer.client-1:AT_LEAST_ONCE.VirtualTopic.user1.client-1.>"}; - - for (String queueName : queues) { - Destination queue = regionBroker.getQueueRegion().getDestinations(new ActiveMQQueue(queueName)).iterator().next(); - assertEquals("Queue " + queueName + " have more than one consumer", 1, queue.getConsumers().size()); + for (Destination queue : regionBroker.getQueueRegion().getDestinationMap().values()) { + assertEquals("Queue " + queue.getActiveMQDestination() + " have more than one consumer", 1, queue.getConsumers().size()); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/679db08d/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicWildcardTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicWildcardTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicWildcardTest.java new file mode 100644 index 0000000..c256e40 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicWildcardTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualTopic; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.spring.ConsumerBean; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertNotNull; +import static org.junit.Assert.assertTrue; + +// https://issues.apache.org/jira/browse/AMQ-6643 +public class VirtualTopicWildcardTest { + + private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicWildcardTest.class); + + protected int total = 3; + protected Connection connection; + BrokerService brokerService; + + @Before + public void init() throws Exception { + brokerService = createBroker(); + brokerService.start(); + connection = createConnection(); + connection.start(); + } + + @After + public void afer() throws Exception { + connection.close(); + brokerService.stop(); + } + + @Test + public void testWildcardAndSimpleConsumerShareMessages() throws Exception { + + ConsumerBean messageList1 = new ConsumerBean("1:"); + ConsumerBean messageList2 = new ConsumerBean("2:"); + ConsumerBean messageList3 = new ConsumerBean("3:"); + + messageList1.setVerbose(true); + messageList2.setVerbose(true); + messageList3.setVerbose(true); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination producerDestination = new ActiveMQTopic("VirtualTopic.TEST.A.IT"); + Destination destination1 = new ActiveMQQueue("Consumer.1.VirtualTopic.TEST.>"); + Destination destination2 = new ActiveMQQueue("Consumer.1.VirtualTopic.TEST.A.IT"); + Destination destination3 = new ActiveMQQueue("Consumer.1.VirtualTopic.TEST.B.IT"); + + LOG.info("Sending to: " + producerDestination); + LOG.info("Consuming from: " + destination1 + " and " + destination2 + ", and " + destination3); + + MessageConsumer c1 = session.createConsumer(destination1, null); + MessageConsumer c2 = session.createConsumer(destination2, null); + // this consumer should get no messages + MessageConsumer c3 = session.createConsumer(destination3, null); + + c1.setMessageListener(messageList1); + c2.setMessageListener(messageList2); + c3.setMessageListener(messageList3); + + // create topic producer + MessageProducer producer = session.createProducer(producerDestination); + assertNotNull(producer); + + for (int i = 0; i < total; i++) { + producer.send(createMessage(session, i)); + } + + assertMessagesArrived(messageList1, messageList2); + assertEquals(0, messageList3.getMessages().size()); + + } + + private Message createMessage(Session session, int i) throws JMSException { + return session.createTextMessage("val=" + i); + } + + private Connection createConnection() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI()); + cf.setWatchTopicAdvisories(false); + return cf.createConnection(); + } + + protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) { + try { + assertTrue("expected", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("One: " + messageList1.getMessages().size() + ", Two:" + messageList2.getMessages().size()); + return messageList1.getMessages().size() + messageList2.getMessages().size() == 2 * total; + } + })); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected BrokerService createBroker() throws Exception { + + BrokerService broker = new BrokerService(); + broker.setAdvisorySupport(false); + broker.setPersistent(false); + + VirtualTopic virtualTopic = new VirtualTopic(); + VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); + interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic}); + broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor}); + return broker; + } +}