Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 238FD18CA3 for ; Tue, 4 Aug 2015 15:59:21 +0000 (UTC) Received: (qmail 58658 invoked by uid 500); 4 Aug 2015 15:59:21 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 58538 invoked by uid 500); 4 Aug 2015 15:59:20 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 58295 invoked by uid 99); 4 Aug 2015 15:59:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Aug 2015 15:59:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 82A13E1894; Tue, 4 Aug 2015 15:59:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dkulp@apache.org To: commits@activemq.apache.org Date: Tue, 04 Aug 2015 15:59:24 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/10] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5814 https://issues.apache.org/jira/browse/AMQ-5814 Use the context of the subscription when the producer adds a destination that matches its wildcard. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f1f171d4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f1f171d4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f1f171d4 Branch: refs/heads/activemq-5.11.x Commit: f1f171d4a569a1f753d5b870d69a73823f8785dd Parents: bf21a22 Author: Timothy Bish Authored: Thu Jul 2 11:36:53 2015 -0400 Committer: Daniel Kulp Committed: Tue Aug 4 08:45:30 2015 -0400 ---------------------------------------------------------------------- .../activemq/broker/region/AbstractRegion.java | 84 ++++++++++- .../org/apache/activemq/bugs/AMQ5814Test.java | 140 +++++++++++++++++++ 2 files changed, 222 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f1f171d4/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 53e8cdd..23a3baa 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -26,9 +26,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.jms.JMSException; + +import org.apache.activemq.DestinationDoesNotExistException; +import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConsumerBrokerExchange; -import org.apache.activemq.DestinationDoesNotExistException; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.virtual.CompositeDestinationFilter; @@ -86,6 +88,7 @@ public abstract class AbstractRegion implements Region { this.destinationFactory = destinationFactory; } + @Override public final void start() throws Exception { started = true; @@ -109,6 +112,7 @@ public abstract class AbstractRegion implements Region { } } + @Override public void stop() throws Exception { started = false; destinationsLock.readLock().lock(); @@ -123,6 +127,7 @@ public abstract class AbstractRegion implements Region { destinations.clear(); } + @Override public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception { @@ -157,16 +162,76 @@ public abstract class AbstractRegion implements Region { return subscriptions; } +<<<<<<< HEAD protected List addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { +======= + + /** + * Updates the counts in RegionStatistics based on whether or not the destination + * is an Advisory Destination or not + * + * @param destination the destination being used to determine which counters to update + * @param count the count to add to the counters + */ + protected void updateRegionDestCounts(ActiveMQDestination destination, int count) { + if (destination != null) { + if (AdvisorySupport.isAdvisoryTopic(destination)) { + regionStatistics.getAdvisoryDestinations().add(count); + } else { + regionStatistics.getDestinations().add(count); + } + regionStatistics.getAllDestinations().add(count); + } + } + + /** + * This method checks whether or not the destination can be created based on + * {@link PolicyEntry#getMaxDestinations}, if it has been set. Advisory + * topics are ignored. + * + * @param destination + * @throws Exception + */ + protected void validateMaxDestinations(ActiveMQDestination destination) + throws Exception { + if (broker.getDestinationPolicy() != null) { + PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); + // Make sure the destination is not an advisory topic + if (entry != null && entry.getMaxDestinations() >= 0 + && !AdvisorySupport.isAdvisoryTopic(destination)) { + // If there is an entry for this destination, look up the set of + // destinations associated with this policy + // If a destination isn't specified, then just count up + // non-advisory destinations (ie count all destinations) + int destinationSize = (int) (entry.getDestination() != null ? + destinationMap.get(entry.getDestination()).size() : regionStatistics.getDestinations().getCount()); + if (destinationSize >= entry.getMaxDestinations()) { + if (entry.getDestination() != null) { + throw new IllegalStateException( + "The maxmimum number of destinations allowed ("+ entry.getMaxDestinations() + + ") for the policy " + entry.getDestination() + " has already been reached."); + // No destination has been set (default policy) + } else { + throw new IllegalStateException("The maxmimum number of destinations allowed (" + + entry.getMaxDestinations() + ") has already been reached."); + } + } + } + } + } + + protected List addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { +>>>>>>> e4af2eb... https://issues.apache.org/jira/browse/AMQ-5814 List rc = new ArrayList(); // Add all consumers that are interested in the destination. for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { Subscription sub = iter.next(); if (sub.matches(dest.getActiveMQDestination())) { try { - dest.addSubscription(context, sub); + ConnectionContext originalContext = sub.getContext() != null ? sub.getContext() : context; + dest.addSubscription(originalContext, sub); rc.add(sub); } catch (SecurityException e) { if (sub.isWildcard()) { @@ -182,6 +247,7 @@ public abstract class AbstractRegion implements Region { } + @Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { @@ -238,6 +304,7 @@ public abstract class AbstractRegion implements Region { * * @return a set of matching destination objects. */ + @Override @SuppressWarnings("unchecked") public Set getDestinations(ActiveMQDestination destination) { destinationsLock.readLock().lock(); @@ -248,10 +315,12 @@ public abstract class AbstractRegion implements Region { } } + @Override public Map getDestinationMap() { return destinations; } + @Override @SuppressWarnings("unchecked") public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { LOG.debug("{} adding consumer: {} for destination: {}", new Object[]{ broker.getBrokerName(), info.getConsumerId(), info.getDestination() }); @@ -371,6 +440,7 @@ public abstract class AbstractRegion implements Region { return inactiveDests; } + @Override @SuppressWarnings("unchecked") public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { LOG.debug("{} removing consumer: {} for destination: {}", new Object[]{ broker.getBrokerName(), info.getConsumerId(), info.getDestination() }); @@ -404,10 +474,12 @@ public abstract class AbstractRegion implements Region { sub.destroy(); } + @Override public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { throw new JMSException("Invalid operation."); } + @Override public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { final ConnectionContext context = producerExchange.getConnectionContext(); @@ -423,6 +495,7 @@ public abstract class AbstractRegion implements Region { } } + @Override public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { Subscription sub = consumerExchange.getSubscription(); if (sub == null) { @@ -441,6 +514,7 @@ public abstract class AbstractRegion implements Region { sub.acknowledge(consumerExchange.getConnectionContext(), ack); } + @Override public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { Subscription sub = subscriptions.get(pull.getConsumerId()); if (sub == null) { @@ -482,6 +556,7 @@ public abstract class AbstractRegion implements Region { return dest; } + @Override public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId()); if (sub != null) { @@ -519,6 +594,7 @@ public abstract class AbstractRegion implements Region { } } + @Override public void gc() { for (Subscription sub : subscriptions.values()) { sub.gc(); @@ -549,6 +625,7 @@ public abstract class AbstractRegion implements Region { this.autoCreateDestinations = autoCreateDestinations; } + @Override @SuppressWarnings("unchecked") public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { destinationsLock.readLock().lock(); @@ -569,6 +646,7 @@ public abstract class AbstractRegion implements Region { * @throws Exception * TODO */ + @Override @SuppressWarnings("unchecked") public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { destinationsLock.readLock().lock(); @@ -587,6 +665,7 @@ public abstract class AbstractRegion implements Region { destinationFactory.removeDestination(dest); } + @Override public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { Subscription sub = subscriptions.get(control.getConsumerId()); if (sub != null && sub instanceof AbstractSubscription) { @@ -606,6 +685,7 @@ public abstract class AbstractRegion implements Region { } } + @Override public void reapplyInterceptor() { destinationsLock.writeLock().lock(); try { http://git-wip-us.apache.org/repos/asf/activemq/blob/f1f171d4/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5814Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5814Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5814Test.java new file mode 100644 index 0000000..29cad9f --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5814Test.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.filter.DestinationMapEntry; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.AuthorizationEntry; +import org.apache.activemq.security.AuthorizationPlugin; +import org.apache.activemq.security.DefaultAuthorizationMap; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ5814Test { + + private BrokerService brokerService; + private String openwireClientUrl; + + public BrokerPlugin configureAuthentication() throws Exception { + List users = new ArrayList<>(); + users.add(new AuthenticationUser("publisher", "123", "publisher")); + users.add(new AuthenticationUser("subscriber", "123", "subscriber")); + users.add(new AuthenticationUser("admin", "123", "publisher,subscriber")); + + SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users); + + return authenticationPlugin; + } + + public BrokerPlugin configureAuthorization() throws Exception { + + @SuppressWarnings("rawtypes") + List authorizationEntries = new ArrayList<>(); + + AuthorizationEntry entry = new AuthorizationEntry(); + entry.setTopic("dcu.>"); + entry.setRead("subscriber"); + entry.setWrite("publisher"); + entry.setAdmin("publisher,subscriber"); + authorizationEntries.add(entry); + + entry = new AuthorizationEntry(); + entry.setTopic("ActiveMQ.Advisory.>"); + entry.setRead("publisher,subscriber"); + entry.setWrite("publisher,subscriber"); + entry.setAdmin("publisher,subscriber"); + authorizationEntries.add(entry); + + DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries); + AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap); + + return authorizationPlugin; + } + + @Before + public void setup() throws Exception { + + TransportConnector openwireConnector = new TransportConnector(); + openwireConnector.setUri(new URI("tcp://localhost:0")); + openwireConnector.setName("openwire"); + + ArrayList plugins = new ArrayList<>(); + plugins.add(configureAuthentication()); + plugins.add(configureAuthorization()); + + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.addConnector(openwireConnector); + if (!plugins.isEmpty()) { + BrokerPlugin[] array = new BrokerPlugin[plugins.size()]; + brokerService.setPlugins(plugins.toArray(array)); + } + brokerService.start(); + brokerService.waitUntilStarted(); + + openwireClientUrl = openwireConnector.getPublishableConnectString(); + } + + @After + public void shutdown() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService.waitUntilStopped(); + brokerService = null; + } + } + + @Test(timeout=30000) + public void testProduceConsumeWithAuthorization() throws Exception { + ConnectionFactory factory = new ActiveMQConnectionFactory(openwireClientUrl); + Connection connection1 = factory.createConnection("subscriber", "123"); + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic wildCarded = session1.createTopic("dcu.>"); + MessageConsumer consumer = session1.createConsumer(wildCarded); + connection1.start(); + + Connection connection2 = factory.createConnection("publisher", "123"); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic named = session2.createTopic("dcu.id"); + MessageProducer producer = session2.createProducer(named); + producer.send(session2.createTextMessage("test")); + + assertNotNull(consumer.receive(2000)); + + connection1.close(); + connection2.close(); + } +}