Return-Path: Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: (qmail 98029 invoked from network); 5 Jul 2010 11:16:02 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 5 Jul 2010 11:16:02 -0000 Received: (qmail 50161 invoked by uid 500); 5 Jul 2010 11:16:02 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 50107 invoked by uid 500); 5 Jul 2010 11:16:01 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 50100 invoked by uid 99); 5 Jul 2010 11:16:00 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Jul 2010 11:16:00 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Jul 2010 11:15:56 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0F5102388994; Mon, 5 Jul 2010 11:15:03 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r960544 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/configuration/ test/java/org/apache/qpid/server/configuration/ Date: Mon, 05 Jul 2010 11:15:02 -0000 To: commits@qpid.apache.org From: ritchiem@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100705111503.0F5102388994@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ritchiem Date: Mon Jul 5 11:15:02 2010 New Revision: 960544 URL: http://svn.apache.org/viewvc?rev=960544&view=rev Log: QPID-2581 : Process new topic configuration Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java?rev=960544&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java (added) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java Mon Jul 5 11:15:02 2010 @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; + +public class TopicConfig extends ConfigurationPlugin +{ + @Override + public String[] getElementsProcessed() + { + return new String[]{"name", "subscriptionName"}; + } + + public String getName() + { + // If we don't have a specific topic then this config is for all topics. + return getStringValue("name", "#"); + } + + public String getSubscriptionName() + { + return getStringValue("subscriptionName"); + } + + public void validateConfiguration() throws ConfigurationException + { + if (_configuration.isEmpty()) + { + throw new ConfigurationException("Topic section cannot be empty."); + } + + if (getStringValue("name") == null && getSubscriptionName() == null) + { + throw new ConfigurationException("Topic section must have a 'name' or 'subscriptionName' element."); + } + + System.err.println("********* Created TC:"+this); + } + + + @Override + public String formatToString() + { + String response = "Topic:"+getName(); + if (getSubscriptionName() != null) + { + response += ", SubscriptionName:"+getSubscriptionName(); + } + + return response; + } +} \ No newline at end of file Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java?rev=960544&r1=960543&r2=960544&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java Mon Jul 5 11:15:02 2010 @@ -22,8 +22,11 @@ package org.apache.qpid.server.configura import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.queue.AMQQueue; import java.util.Arrays; import java.util.HashMap; @@ -31,7 +34,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -public class TopicConfiguration extends ConfigurationPlugin +public class TopicConfiguration extends ConfigurationPlugin implements ExchangeConfigurationPlugin { public static final ConfigurationPluginFactory FACTORY = new TopicConfigurationFactory(); @@ -54,6 +57,7 @@ public class TopicConfiguration extends } Map _topics = new HashMap(); + Map> _subscriptions = new HashMap>(); public String[] getElementsProcessed() { @@ -68,53 +72,186 @@ public class TopicConfiguration extends throw new ConfigurationException("Topics section cannot be empty."); } - int topics = _configuration.getList("topic.name").size(); + int topics = _configuration.getList("topic.name").size() + + _configuration.getList("topic.subscriptionName").size(); - for(int index=0; index topics; + if (_subscriptions.containsKey(name)) + { + topics = _subscriptions.get(name); + + if (topics.containsKey(topic.getName())) + { + throw new ConfigurationException("Subcription cannot contain two entries for the same topic."); + } + } + else { - return new String[]{"name"}; + topics = new HashMap(); } - public String getName() + topics.put(topic.getName(),topic); + _subscriptions.put(name, topics); + + } + + @Override + public String formatToString() + { + return "Topics:" + _topics + ", Subscriptions:" + _subscriptions; + } + + /** + * This processes the given queue and apply configuration in the following + * order: + * + * Global Topic Values -> Topic Values -> Subscription Values + * + * @param queue + * + * @return + */ + public ConfigurationPlugin getConfiguration(AMQQueue queue) + { + TopicConfig config = new TopicConfig(); + + // Add global topic configuration + config.addConfiguration(this); + + // Process Topic Bindings as these are more generic than subscriptions + List boundToTopics = new LinkedList(); + + //Merge the configuration in the order that they are bound + for (Binding binding : queue.getBindings()) { - // If we don't specify a topic name then match all topics - String configName = getStringValue("name"); - return configName == null ? "#" : configName; + if (binding.getExchange().getType().equals(TopicExchange.TYPE)) + { + // Identify topic for the binding key + TopicConfig topicConfig = getTopicConfigForRoutingKey(binding.getBindingKey()); + if (topicConfig != null) + { + boundToTopics.add(topicConfig); + } + } } + // If the Queue is bound to a number of topics then only use the global + // topic configuration. + // todo - What does it mean in terms of configuration to be bound to a + // number of topics? Do we try and merge? + // YES - right thing to do would be to merge from generic to specific. + // Means we need to be able to get an ordered list of topics for this + // binding. + if (boundToTopics.size() == 1) + { + config.addConfiguration(boundToTopics.get(0)); + } - public void validateConfiguration() throws ConfigurationException + // Apply subscription configurations + if (_subscriptions.containsKey(queue.getName())) { - if(_configuration.isEmpty()) + Map topics = _subscriptions.get(queue.getName()); + + TopicConfig subscriptionSpecificConfig = null; + + // See if we have a TopicConfig in topics for a topic we are bound to. + for (Binding binding : queue.getBindings()) + { + if (binding.getExchange().getType().equals(TopicExchange.TYPE)) + { + //todo - What does it mean to have multiple matches? + // Take the first match we get + if (subscriptionSpecificConfig == null) + { + // lookup the binding to see if we have a match in the subscription configs + subscriptionSpecificConfig = topics.get(binding.getBindingKey()); + } + } + } + + // Apply subscription specfic config. + if (subscriptionSpecificConfig != null) { - throw new ConfigurationException("Topic section cannot be empty."); + config.addConfiguration(subscriptionSpecificConfig); } } + + return config; + } + + /** + * This method should perform the same heuristics as the TopicExchange + * to attempt to identify a piece of configuration for the give routingKey. + * + * i.e. If we have 'stocks.*' defined in the config + * and we bind 'stocks.appl' then we should return the 'stocks.*' + * configuration. + * + * @param routingkey the key to lookup + * + * @return the TopicConfig if found. + */ + private TopicConfig getTopicConfigForRoutingKey(String routingkey) + { + //todo actually perform TopicExchange style lookup not just straight + // lookup as we are just now. + return _topics.get(routingkey); } } Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java?rev=960544&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java (added) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java Mon Jul 5 11:15:02 2010 @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.util.InternalBrokerBaseCase; + +public class TopicConfigurationTest extends InternalBrokerBaseCase +{ + + @Override + public void configure() + { + _configXml.addProperty("virtualhosts.virtualhost.test.topics.topic.name", "stocks.nyse.appl"); + + _configXml.addProperty("virtualhosts.virtualhost.test.topics.topic(1).subscriptionName", "testSubscriptionCreation:stockSubscription"); + + _configXml.addProperty("virtualhosts.virtualhost.test.topics.topic(2).name", "stocks.nyse.orcl"); + _configXml.addProperty("virtualhosts.virtualhost.test.topics.topic(2).subscriptionName", getName()+":stockSubscription"); + } + + public void testTopicCreation() throws ConfigurationException, AMQSecurityException + { + Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME); + _virtualHost.getBindingFactory().addBinding("stocks.nyse.appl", _queue, topicExchange, null); + + TopicConfig config = _queue.getConfiguration().getConfiguration(TopicConfig.class.getName()); + + assertNotNull("Queue should have topic configuration bound to it.", config); + assertEquals("Configuration name not correct", "stocks.nyse.appl", config.getName()); + } + + public void testSubscriptionCreation() throws ConfigurationException, AMQException + { + + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()+":stockSubscription"), false, new AMQShortString("testowner"), + false, false, _virtualHost, null); + + _virtualHost.getQueueRegistry().registerQueue(queue); + Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); + _virtualHost.getBindingFactory().addBinding(getName(), queue, defaultExchange, null); + + + Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME); + _virtualHost.getBindingFactory().addBinding("stocks.nyse.orcl", queue, topicExchange, null); + + TopicConfig config = queue.getConfiguration().getConfiguration(TopicConfig.class.getName()); + + assertNotNull("Queue should have topic configuration bound to it.", config); + assertEquals("Configuration name not correct", getName() + ":stockSubscription", config.getSubscriptionName()); + } + + +} --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org