qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ritch...@apache.org
Subject svn commit: r960544 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/configuration/ test/java/org/apache/qpid/server/configuration/
Date Mon, 05 Jul 2010 11:15:02 GMT
Author: ritchiem
Date: Mon Jul  5 11:15:02 2010
New Revision: 960544

URL: http://svn.apache.org/viewvc?rev=960544&view=rev
Log:
QPID-2581 : Process new topic configuration

Added:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java?rev=960544&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java
(added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java
Mon Jul  5 11:15:02 2010
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+
+public class TopicConfig extends ConfigurationPlugin
+{
+    @Override
+    public String[] getElementsProcessed()
+    {
+        return new String[]{"name", "subscriptionName"};
+    }
+
+    public String getName()
+    {
+        // If we don't have a specific topic then this config is for all topics.
+        return getStringValue("name", "#");
+    }
+
+    public String getSubscriptionName()
+    {
+        return getStringValue("subscriptionName");
+    }
+
+    public void validateConfiguration() throws ConfigurationException
+    {
+        if (_configuration.isEmpty())
+        {
+            throw new ConfigurationException("Topic section cannot be empty.");
+        }
+
+        if (getStringValue("name") == null && getSubscriptionName() == null)
+        {
+            throw new ConfigurationException("Topic section must have a 'name' or 'subscriptionName'
element.");
+        }
+
+        System.err.println("********* Created TC:"+this);
+    }
+
+
+    @Override
+    public String formatToString()
+    {
+        String response = "Topic:"+getName();
+        if (getSubscriptionName() != null)
+        {
+               response += ", SubscriptionName:"+getSubscriptionName();
+        }
+
+        return response;
+    }    
+}
\ No newline at end of file

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java?rev=960544&r1=960543&r2=960544&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java
Mon Jul  5 11:15:02 2010
@@ -22,8 +22,11 @@ package org.apache.qpid.server.configura
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
 import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.queue.AMQQueue;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,7 +34,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-public class TopicConfiguration extends ConfigurationPlugin
+public class TopicConfiguration extends ConfigurationPlugin implements ExchangeConfigurationPlugin
 {
     public static final ConfigurationPluginFactory FACTORY = new TopicConfigurationFactory();
 
@@ -54,6 +57,7 @@ public class TopicConfiguration extends 
     }
 
     Map<String, TopicConfig> _topics = new HashMap<String, TopicConfig>();
+    Map<String,  Map<String, TopicConfig>> _subscriptions = new HashMap<String,
 Map<String, TopicConfig>>();
 
     public String[] getElementsProcessed()
     {
@@ -68,53 +72,186 @@ public class TopicConfiguration extends 
             throw new ConfigurationException("Topics section cannot be empty.");
         }
 
-        int topics = _configuration.getList("topic.name").size();
+        int topics = _configuration.getList("topic.name").size() +
+                     _configuration.getList("topic.subscriptionName").size();
 
-        for(int index=0; index<topics;index++)
+        for (int index = 0; index < topics; index++)
         {
             TopicConfig topic = new TopicConfig();
-            topic.setConfiguration(VIRTUALHOSTS_VIRTUALHOST_TOPICS + ".topic", _configuration.subset("topic("
+ index + ")"));
 
-            String topicName = _configuration.getString("topic(" + index + ").name");
-            if(_topics.containsKey(topicName))
+            Configuration topicSubset = _configuration.subset("topic(" + index + ")");
+
+            // This will occur when we have a subscriptionName that is bound to a
+            // topic.
+            if (topicSubset.isEmpty())
+            {
+                break;
+            }
+
+            topic.setConfiguration(VIRTUALHOSTS_VIRTUALHOST_TOPICS + ".topic", topicSubset
);
+
+            String name = _configuration.getString("topic(" + index + ").name");
+            String subscriptionName = _configuration.getString("topic(" + index + ").subscriptionName");
+
+            // Record config if subscriptionName is there
+            if (subscriptionName != null)
             {
-                throw new ConfigurationException("Topics section cannot contain two entries
for the same topic.");                
+                processSubscription(subscriptionName, topic);
             }
             else
             {
-                _topics.put(topicName, topic);
+                // Otherwise record config as topic if we have the name
+                if (name != null)
+                {
+                    processTopic(name, topic);
+                }
             }
         }
     }
 
-    public String toString()
+    /**
+     * @param name
+     * @param topic
+     *
+     * @throws org.apache.commons.configuration.ConfigurationException
+     *
+     */
+    private void processTopic(String name, TopicConfig topic) throws ConfigurationException
     {
-        return getClass().getName() + ": Defined Topics:" + _topics.size();
+        if (_topics.containsKey(name))
+        {
+            throw new ConfigurationException("Topics section cannot contain two entries for
the same topic.");
+        }
+        else
+        {
+            _topics.put(name, topic);
+        }
     }
 
-    public static class TopicConfig extends ConfigurationPlugin
+
+    private void processSubscription(String name, TopicConfig topic) throws ConfigurationException
     {
-        @Override
-        public String[] getElementsProcessed()
+        Map<String,TopicConfig> topics;
+        if (_subscriptions.containsKey(name))
+        {
+            topics = _subscriptions.get(name);
+
+            if (topics.containsKey(topic.getName()))
+            {
+                throw new ConfigurationException("Subcription cannot contain two entries
for the same topic.");
+            }
+        }
+        else
         {
-            return new String[]{"name"};
+            topics = new HashMap<String,TopicConfig>();
         }
 
-        public String getName()
+        topics.put(topic.getName(),topic);
+        _subscriptions.put(name, topics);
+
+    }
+
+    @Override
+    public String formatToString()
+    {
+        return "Topics:" + _topics + ", Subscriptions:" + _subscriptions;
+    }
+
+    /**
+     * This processes the given queue and apply configuration in the following
+     * order:
+     *
+     * Global Topic Values -> Topic Values -> Subscription Values
+     *
+     * @param queue
+     *
+     * @return
+     */
+    public ConfigurationPlugin getConfiguration(AMQQueue queue)
+    {
+        TopicConfig config = new TopicConfig();
+
+        // Add global topic configuration
+        config.addConfiguration(this);
+
+        // Process Topic Bindings as these are more generic than subscriptions
+        List<TopicConfig> boundToTopics = new LinkedList<TopicConfig>();
+
+        //Merge the configuration in the order that they are bound
+        for (Binding binding : queue.getBindings())
         {
-            // If we don't specify a topic name then match all topics
-            String configName = getStringValue("name");
-            return configName == null ? "#" : configName;
+            if (binding.getExchange().getType().equals(TopicExchange.TYPE))
+            {
+                // Identify topic for the binding key
+                TopicConfig topicConfig = getTopicConfigForRoutingKey(binding.getBindingKey());
+                if (topicConfig != null)
+                {
+                    boundToTopics.add(topicConfig);
+                }
+            }
         }
 
+        // If the Queue is bound to a number of topics then only use the global
+        // topic configuration.
+        // todo - What does it mean in terms of configuration to be bound to a
+        // number of topics? Do we try and merge?
+        // YES - right thing to do would be to merge from generic to specific.
+        // Means we need to be able to get an ordered list of topics for this
+        // binding.
+        if (boundToTopics.size() == 1)
+        {
+            config.addConfiguration(boundToTopics.get(0));
+        }
 
-        public void validateConfiguration() throws ConfigurationException
+        // Apply subscription configurations
+        if (_subscriptions.containsKey(queue.getName()))
         {
-            if(_configuration.isEmpty())
+            Map<String, TopicConfig> topics = _subscriptions.get(queue.getName());
+
+            TopicConfig subscriptionSpecificConfig = null;
+
+            // See if we have a TopicConfig in topics for a topic we are bound to. 
+            for (Binding binding : queue.getBindings())
+            {
+                if (binding.getExchange().getType().equals(TopicExchange.TYPE))
+                {
+                    //todo - What does it mean to have multiple matches?
+                    // Take the first match we get
+                    if (subscriptionSpecificConfig == null)
+                    {
+                        // lookup the binding to see if we have a match in the subscription
configs
+                        subscriptionSpecificConfig = topics.get(binding.getBindingKey());
+                    }
+                }
+            }
+
+            // Apply subscription specfic config.
+            if (subscriptionSpecificConfig != null)
             {
-                throw new ConfigurationException("Topic section cannot be empty.");
+                config.addConfiguration(subscriptionSpecificConfig);
             }
         }
+
+        return config;
+    }
+
+    /**
+     * This method should perform the same heuristics as the TopicExchange
+     * to attempt to identify a piece of configuration for the give routingKey.
+     *
+     * i.e. If we have 'stocks.*' defined in the config
+     * and we bind 'stocks.appl' then we should return the 'stocks.*'
+     * configuration.
+     *
+     * @param routingkey the key to lookup
+     *
+     * @return the TopicConfig if found.
+     */
+    private TopicConfig getTopicConfigForRoutingKey(String routingkey)
+    {
+        //todo actually perform TopicExchange style lookup not just straight
+        // lookup as we are just now.
+        return _topics.get(routingkey);
     }
 
 }

Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java?rev=960544&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java
(added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java
Mon Jul  5 11:15:02 2010
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+
+public class TopicConfigurationTest extends InternalBrokerBaseCase
+{
+
+    @Override
+    public void configure()
+    {
+        _configXml.addProperty("virtualhosts.virtualhost.test.topics.topic.name", "stocks.nyse.appl");
+
+        _configXml.addProperty("virtualhosts.virtualhost.test.topics.topic(1).subscriptionName",
"testSubscriptionCreation:stockSubscription");
+
+        _configXml.addProperty("virtualhosts.virtualhost.test.topics.topic(2).name", "stocks.nyse.orcl");
+        _configXml.addProperty("virtualhosts.virtualhost.test.topics.topic(2).subscriptionName",
getName()+":stockSubscription");
+   }
+
+    public void testTopicCreation() throws ConfigurationException, AMQSecurityException
+    {
+        Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
+        _virtualHost.getBindingFactory().addBinding("stocks.nyse.appl", _queue, topicExchange,
null);
+
+        TopicConfig config = _queue.getConfiguration().getConfiguration(TopicConfig.class.getName());
+
+        assertNotNull("Queue should have topic configuration bound to it.", config);
+        assertEquals("Configuration name not correct", "stocks.nyse.appl", config.getName());
+    }
+
+    public void testSubscriptionCreation() throws ConfigurationException, AMQException
+    {
+
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()+":stockSubscription"),
false, new AMQShortString("testowner"),
+                                                    false, false, _virtualHost, null);
+
+        _virtualHost.getQueueRegistry().registerQueue(queue);
+        Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+        _virtualHost.getBindingFactory().addBinding(getName(), queue, defaultExchange, null);
+
+
+        Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
+        _virtualHost.getBindingFactory().addBinding("stocks.nyse.orcl", queue, topicExchange,
null);
+
+        TopicConfig config = queue.getConfiguration().getConfiguration(TopicConfig.class.getName());
+
+        assertNotNull("Queue should have topic configuration bound to it.", config);
+        assertEquals("Configuration name not correct", getName() + ":stockSubscription",
config.getSubscriptionName());
+    }
+
+
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message