qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ritch...@apache.org
Subject svn commit: r960546 - in /qpid/trunk/qpid/java: broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/qpid/server/configuratio...
Date Mon, 05 Jul 2010 11:15:25 GMT
Author: ritchiem
Date: Mon Jul  5 11:15:25 2010
New Revision: 960546

URL: http://svn.apache.org/viewvc?rev=960546&view=rev
Log:
QPID-2681 : Provide ability to merge configurations. This does simple merging of sub configuration
elements. Currently the last value to be merged is taken as is. No explicit merging is performed.
Merging is performed in the order Queues->(Exchange)->Queue, Where a configured Exchange
Configuration component can decide how to perform its merge. TopicConfiguration performs the
order Topics->Topic->Subscriptions, this allows Global Topic configuration to be overwritten
by a specific topic version. Currently the Topic is only identified by a straight string wild
card matching has not yet been implemented.

Added:
    qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfigurationPlugin.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java

Added: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java?rev=960546&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java
(added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java
Mon Jul  5 11:15:25 2010
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.NamingException;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class MergeConfigurationTest extends TestingBaseCase
+{
+
+    protected int topicCount = 0;
+
+
+    public void configureTopic(String topic, int msgCount) throws NamingException, IOException,
ConfigurationException
+    {
+
+        setProperty(".topics.topic("+topicCount+").name", topic);
+        setProperty(".topics.topic("+topicCount+").slow-consumer-detection.messageCount",
String.valueOf(msgCount));
+        setProperty(".topics.topic("+topicCount+").slow-consumer-detection.policy.name",
"TopicDelete");
+        topicCount++;
+    }
+
+
+    /**
+     * Test that setting messageCount takes affect on topics
+     *
+     * We send 10 messages and disconnect at 9
+     *
+     * @throws Exception
+     */
+    public void testTopicConsumerMessageCount() throws Exception
+    {
+        MAX_QUEUE_MESSAGE_COUNT = 10;
+
+        configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT * 4) - 1);
+
+        //Configure topic as a subscription
+        setProperty(".topics.topic("+topicCount+").subscriptionName", "clientid:"+getTestQueueName());
+        configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT - 1));
+
+
+
+        //Start the broker
+        startBroker();
+
+        topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
+    }
+
+
+//
+//    public void testMerge() throws ConfigurationException, AMQException
+//    {
+//
+//        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()+":stockSubscription"),
false, new AMQShortString("testowner"),
+//                                                    false, false, _virtualHost, null);
+//
+//        _virtualHost.getQueueRegistry().registerQueue(queue);
+//        Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+//        _virtualHost.getBindingFactory().addBinding(getName(), queue, defaultExchange,
null);
+//
+//
+//        Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
+//        _virtualHost.getBindingFactory().addBinding("stocks.nyse.orcl", queue, topicExchange,
null);
+//
+//        TopicConfig config = queue.getConfiguration().getConfiguration(TopicConfig.class.getName());
+//
+//        assertNotNull("Queue should have topic configuration bound to it.", config);
+//        assertEquals("Configuration name not correct", getName() + ":stockSubscription",
config.getSubscriptionName());
+//
+//        ConfigurationPlugin scdConfig = queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
+//        if (scdConfig instanceof org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration)
+//        {
+//            System.err.println("********************** scd is a SlowConsumerDetectionQueueConfiguration.");
+//        }
+//        else
+//        {
+//            System.err.println("********************** Test SCD "+SlowConsumerDetectionQueueConfiguration.class.getClassLoader());
+//            System.err.println("********************** Broker SCD "+scdConfig.getClass().getClassLoader());
+//                 System.err.println("********************** Broker SCD "+scdConfig.getClass().isAssignableFrom(SlowConsumerDetectionQueueConfiguration.class));
+//            System.err.println("********************** is a "+scdConfig.getClass());
+//        }
+//
+//        assertNotNull("Queue should have scd configuration bound to it.", scdConfig);
+//        assertEquals("MessageCount is not correct", 10 , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getMessageCount());
+//        assertEquals("Policy is not correct", TopicDeletePolicy.class.getName() , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getPolicy().getClass().getName());
+//    }
+
+}

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfigurationPlugin.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfigurationPlugin.java?rev=960546&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfigurationPlugin.java
(added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfigurationPlugin.java
Mon Jul  5 11:15:25 2010
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public interface ExchangeConfigurationPlugin
+{
+    ConfigurationPlugin getConfiguration(AMQQueue queue);
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=960546&r1=960545&r2=960546&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
Mon Jul  5 11:15:25 2010
@@ -23,7 +23,6 @@ package org.apache.qpid.server.configura
 import java.util.List;
 
 import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
@@ -167,4 +166,43 @@ public class QueueConfiguration extends 
     {
         return getStringValue("lvqKey", null);
     }
+
+
+    public static class QueueConfig extends ConfigurationPlugin
+    {
+        @Override
+        public String[] getElementsProcessed()
+        {
+            return new String[]{"name"};
+        }
+
+        public String getName()
+        {
+            return getStringValue("name");
+        }
+
+
+        public void validateConfiguration() throws ConfigurationException
+        {
+            if (_configuration.isEmpty())
+            {
+                throw new ConfigurationException("Queue section cannot be empty.");
+            }
+
+            if (getName() == null)
+            {
+                throw new ConfigurationException("Queue section must have a 'name' element.");
+            }
+
+        }
+
+
+        @Override
+        public String formatToString()
+        {
+            return "Name:"+getName();
+        }
+          
+
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=960546&r1=960545&r2=960546&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
Mon Jul  5 11:15:25 2010
@@ -34,11 +34,7 @@ import org.apache.commons.configuration.
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.binding.BindingFactory;
 import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MemoryMessageStore;
@@ -212,15 +208,48 @@ public class VirtualHostConfiguration ex
                                + exchangeName.substring(0, 1).toUpperCase()
                                + exchangeName.substring(1) + "Configuration";
 
-        ConfigurationPlugin configPlugin
-                = queue.getVirtualHost().getConfiguration().getConfiguration(exchangeClass);
-
+        ExchangeConfigurationPlugin exchangeConfiguration
+                = (ExchangeConfigurationPlugin) queue.getVirtualHost().getConfiguration().getConfiguration(exchangeClass);
 
         // now need to perform the queue-topic-topics-queue magic.
+        // So make a new ConfigurationObject that will hold all the configuration for this
queue.
+        ConfigurationPlugin queueConfig = new QueueConfiguration.QueueConfig();
+
+        // Initialise the queue with any Global values we may have
+        QueueConfiguration config = getConfiguration(QueueConfiguration.class.getName());
+        if (config == null)
+        {
+            PropertiesConfiguration newQueueConfig = new PropertiesConfiguration();
+            newQueueConfig.setProperty("name", queue.getName());
 
-        System.err.println("*********** Reconfiguring queue with config:"+configPlugin);
+            try
+            {
+                queueConfig.setConfiguration("", newQueueConfig);
+            }
+            catch (ConfigurationException e)
+            {
+                // This will not occur as queues only require a name. 
+                _logger.error("QueueConfiguration requirements have changed.");
+            }
+        }
+        else
+        {
+            queueConfig.addConfiguration(config);
+        }
+
+        // Merge any configuration the Exchange wishes to apply        
+        if (exchangeConfiguration != null)
+        {
+            queueConfig.addConfiguration(exchangeConfiguration.getConfiguration(queue));
+        }
+
+        //Finally merge in any specific queue configuration we have.
+        if (_queues.containsKey(queue.getName()))
+        {
+            queueConfig.addConfiguration(_queues.get(queue.getName()));
+        }
 
-        return configPlugin;
+        return queueConfig;
     }
 
     public long getMemoryUsageMaximum()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java?rev=960546&r1=960545&r2=960546&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
Mon Jul  5 11:15:25 2010
@@ -18,6 +18,13 @@
  */
 package org.apache.qpid.server.configuration.plugins;
 
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConversionException;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.ConfigurationManager;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -27,13 +34,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConversionException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.ConfigurationManager;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-
 public abstract class ConfigurationPlugin
 {
     protected static final Logger _logger = Logger.getLogger(ConfigurationPlugin.class);
@@ -45,20 +45,18 @@ public abstract class ConfigurationPlugi
 
     /**
      * The Elements that this Plugin can process.
-     * 
+     *
      * For a Queues plugin that would be a list containing:
      * <ul>
      * <li>queue - the queue entries
      * <li>the alerting values for defaults
      * <li>exchange - the default exchange
      * <li>durable - set the default durablity
-     * </ul> 
+     * </ul>
      */
     abstract public String[] getElementsProcessed();
-    
-    /**
-     * Performs configuration validation.
-     */
+
+    /** Performs configuration validation. */
     public void validateConfiguration() throws ConfigurationException
     {
         // Override in sub-classes
@@ -145,14 +143,20 @@ public abstract class ConfigurationPlugi
         {
             ConfigurationManager configurationManager = ApplicationRegistry.getInstance().getConfigurationManager();
             Configuration handled = element.length() == 0 ? configuration : configuration.subset(element);
-            
+
             String configurationElement = element;
             if (path.length() > 0)
             {
-                configurationElement =  path + "." + configurationElement;
+                configurationElement = path + "." + configurationElement;
             }
 
             List<ConfigurationPlugin> handlers = configurationManager.getConfigurationPlugins(configurationElement,
handled);
+
+            if(_logger.isDebugEnabled())
+            {
+                _logger.debug("For '" + element + "' found handlers (" + handlers.size()
+ "):" + handlers);
+            }
+            
             for (ConfigurationPlugin plugin : handlers)
             {
                 _pluginConfiguration.put(plugin.getClass().getName(), plugin);
@@ -161,10 +165,8 @@ public abstract class ConfigurationPlugi
 
         validateConfiguration();
     }
-    
-    /**
-     * Helper method to print out list of keys in a {@link Configuration}.
-     */
+
+    /** Helper method to print out list of keys in a {@link Configuration}. */
     public static final void showKeys(Configuration config)
     {
         if (config.isEmpty())
@@ -186,7 +188,7 @@ public abstract class ConfigurationPlugi
     {
         return _configuration != null;
     }
-    
+
     /// Getters
 
     protected double getDoubleValue(String property)
@@ -199,7 +201,6 @@ public abstract class ConfigurationPlugi
         return _configuration.getDouble(property, defaultValue);
     }
 
-
     protected long getLongValue(String property)
     {
         return getLongValue(property, 0);
@@ -250,8 +251,6 @@ public abstract class ConfigurationPlugi
         return _configuration.getList(property, defaultValue);
     }
 
-
-
     /// Validation Helpers
 
     protected boolean contains(String property)
@@ -259,7 +258,6 @@ public abstract class ConfigurationPlugi
         return _configuration.getProperty(property) != null;
     }
 
-
     /**
      * Provide mechanism to validate Configuration contains a Postiive Long Value
      *
@@ -354,6 +352,102 @@ public abstract class ConfigurationPlugi
         }
     }
 
+    /**
+     * Given another configuration merge the configuration into our own config
+     *
+     * The new values being merged in will take precedence over existing values.
+     *
+     * In the simplistic case this means something like:
+     *
+     * So if we have configuration set
+     * name = 'fooo'
+     *
+     * And the new configuration contains a name then that will be reset.
+     * name = 'new'
+     *
+     * However this plugin will simply contain other plugins so the merge will
+     * be called until we end up at a base plugin that understand how to merge
+     * items. i.e Alerting values. Where the provided configuration will take
+     * precedence.
+     *
+     * @param configuration the config to merge in to our own.
+     */
+    public void addConfiguration(ConfigurationPlugin configuration)
+    {
+        // If given configuration is null then there is nothing to process.
+        if (configuration == null)
+        {
+            return;
+        }
+
+        // Merge all the sub configuration items
+        for (Map.Entry<String, ConfigurationPlugin> newPlugins : configuration._pluginConfiguration.entrySet())
+        {
+            String key = newPlugins.getKey();
+            ConfigurationPlugin config = newPlugins.getValue();
+
+            if (_pluginConfiguration.containsKey(key))
+            {
+                //Merge the configuration if we already have this type of config
+                _pluginConfiguration.get(key).mergeConfiguration(config);
+            }
+            else
+            {
+                //otherwise just add it to our config.
+                _pluginConfiguration.put(key, config);
+            }
+        }
+
+        //Merge the configuration itself
+        String key = configuration.getClass().getName();
+        if (_pluginConfiguration.containsKey(key))
+        {
+            //Merge the configuration if we already have this type of config
+            _pluginConfiguration.get(key).mergeConfiguration(configuration);
+        }
+        else
+        {
+            //If we are adding a configuration of our own type then merge
+            if (configuration.getClass() == this.getClass())
+            {
+                mergeConfiguration(configuration);
+            }
+            else
+            {
+                // just store this in case someone else needs it.
+                _pluginConfiguration.put(key, configuration);
+            }
+
+        }
+
+    }
+
+    protected void mergeConfiguration(ConfigurationPlugin configuration)
+    {
+         _configuration = configuration.getConfig();
+    }
+
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append("\n").append(getClass().getSimpleName());
+        sb.append("=[ (").append(formatToString()).append(")");
+
+        for(Map.Entry<String,ConfigurationPlugin> item : _pluginConfiguration.entrySet())
+        {
+            sb.append("\n").append(item.getValue());
+        }
+
+        sb.append("]\n");
+
+        return sb.toString();
+    }
+
+    public String formatToString()
+    {
+        return super.toString();
+    }
 
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message