qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From grk...@apache.org
Subject svn commit: r966634 - in /qpid/trunk/qpid/java: broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ broker/src/main/java/org/apache/qpid/server/queue/
Date Thu, 22 Jul 2010 13:02:50 GMT
Author: grkvlt
Date: Thu Jul 22 13:02:49 2010
New Revision: 966634

URL: http://svn.apache.org/viewvc?rev=966634&view=rev
Log:
QPID-2679: cache queues that are configured on a per-virtualhost basis

Added:
    qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java
Modified:
    qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java

Added: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java?rev=966634&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java
(added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java
Thu Jul 22 13:02:49 2010
@@ -0,0 +1,86 @@
+package org.apache.qpid.server.virtualhost.plugin;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration;
+import org.apache.qpid.server.exchange.AbstractExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.Exchange.BindingListener;
+import org.apache.qpid.server.queue.AMQQueue;
+
+/**
+ * This is a listener that caches queues that are configured for slow consumer disconnection.
+ * 
+ * There should be one listener per virtual host, which can be added to all exchanges on
+ * that host.
+ * 
+ * TODO In future, it will be possible to configure the policy at runtime, so only the queue
+ * itself is cached, and the configuration looked up by the housekeeping thread. This means
+ * that there may be occasions where the copy of the cache contents retrieved by the thread
+ * does not contain queues that are configured, or that configured queues are not present.
+ * 
+ * @see BindingListener
+ */
+public class ConfiguredQueueBindingListener implements BindingListener
+{
+    private static final Logger _log = Logger.getLogger(ConfiguredQueueBindingListener.class);
+    
+    private String _vhostName;
+    private Set<AMQQueue> _cache = Collections.synchronizedSet(new HashSet<AMQQueue>());
+    
+    public ConfiguredQueueBindingListener(String vhostName)
+    {
+        _vhostName = vhostName;
+    }
+
+    /**
+     * @see BindingListener#bindingAdded(Exchange, Binding)
+     */
+    public void bindingAdded(Exchange exchange, Binding binding)
+    {
+        processBinding(binding);
+    }
+
+    /**
+     * @see BindingListener#bindingRemoved(Exchange, Binding)
+     */
+    public void bindingRemoved(Exchange exchange, Binding binding)
+    {
+        processBinding(binding);
+    }
+    
+    private void processBinding(Binding binding)
+    {
+        AMQQueue queue = binding.getQueue();
+        
+        SlowConsumerDetectionQueueConfiguration config =
+            queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
+        if (config != null)
+        {
+            _cache.add(queue);
+        }
+        else
+        {
+            _cache.remove(queue);
+        }
+    }
+    
+    /**
+     * Lookup and return the cache of configured {@link AMQQueue}s.
+     * 
+	 * Note that when accessing the cached queues, the {@link Iterator} is not thread safe
+	 * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the
+	 * cache is returned.
+     * 
+     * @return a copy of the cached {@link java.util.Set} of queues
+     */
+    public Set<AMQQueue> getQueueCache()
+    {
+        return new HashSet<AMQQueue>(_cache);
+    }
+}

Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java?rev=966634&r1=966633&r2=966634&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
(original)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
Thu Jul 22 13:02:49 2010
@@ -1,6 +1,5 @@
 /*
  *
- * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -20,12 +19,16 @@
  */
 package org.apache.qpid.server.virtualhost.plugin;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration;
 import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration;
 import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.plugins.Plugin;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.plugin.logging.SlowConsumerDetectionMessages;
@@ -35,6 +38,7 @@ import org.apache.qpid.server.virtualhos
 class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
 {
     private SlowConsumerDetectionConfiguration _config;
+    private ConfiguredQueueBindingListener _listener;
 
     public static class SlowConsumerFactory implements VirtualHostPluginFactory
     {
@@ -53,9 +57,21 @@ class SlowConsumerDetection extends Virt
         }
     }
 
+    /**
+     * Configures the slow consumer disconnect plugin by adding a listener to each exchange
on this
+     * cirtual host to record all the configured queues in a cache for processing by the
housekeeping
+     * thread.
+     * 
+     * @see Plugin#configure(ConfigurationPlugin)
+     */
     public void configure(ConfigurationPlugin config)
     {        
         _config = (SlowConsumerDetectionConfiguration) config;
+        _listener = new ConfiguredQueueBindingListener(_virtualhost.getName());
+        for (AMQShortString exchangeName : _virtualhost.getExchangeRegistry().getExchangeNames())
+        {
+            _virtualhost.getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener);
+        }
     }
     
     public SlowConsumerDetection(VirtualHost vhost)
@@ -63,19 +79,19 @@ class SlowConsumerDetection extends Virt
         super(vhost);
     }
 
-    @Override
     public void execute()
     {
-        SlowConsumerDetectionMessages.RUNNING();
-
-        for (AMQQueue q : _virtualhost.getQueueRegistry().getQueues())
+        CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING());
+        
+        Set<AMQQueue> cache = _listener.getQueueCache();
+        for (AMQQueue q : cache)
         {
-            SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName());
+            CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName()));
+            
             try
             {
                 SlowConsumerDetectionQueueConfiguration config =
-                            q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
-
+                    q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
                 if (checkQueueStatus(q, config))
                 {
                     config.getPolicy().performPolicy(q);
@@ -83,15 +99,12 @@ class SlowConsumerDetection extends Virt
             }
             catch (Exception e)
             {
-                _logger.error("Exception in SlowConsumersDetection " +
-                              "for queue: " +
-                              q.getNameShortString().toString(), e);
-                //Don't throw exceptions as this will stop the
-                // house keeping task from running.
+                // Don't throw exceptions as this will stop the house keeping task from running.
+                _logger.error("Exception in SlowConsumersDetection for queue: " + q.getName(),
e);
             }
         }
 
-        SlowConsumerDetectionMessages.COMPLETE();
+        CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE());
     }
 
     public long getDelay()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=966634&r1=966633&r2=966634&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Thu Jul 22 13:02:49 2010
@@ -515,7 +515,12 @@ public class SimpleAMQQueue implements A
                 break;
             }
         }
-
+        
+        reconfigure();
+    }
+    
+    private void reconfigure()
+    {
         //Reconfigure the queue for to reflect this new binding.
         ConfigurationPlugin config = getVirtualHost().getConfiguration().getQueueConfiguration(this);
 
@@ -539,6 +544,8 @@ public class SimpleAMQQueue implements A
     public void removeBinding(final Binding binding)
     {
         _bindings.remove(binding);
+        
+        reconfigure();
     }
 
     public List<Binding> getBindings()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message