Return-Path: Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: (qmail 76186 invoked from network); 22 Jul 2010 13:03:51 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 22 Jul 2010 13:03:51 -0000 Received: (qmail 73828 invoked by uid 500); 22 Jul 2010 13:03:50 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 73704 invoked by uid 500); 22 Jul 2010 13:03:48 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 73695 invoked by uid 99); 22 Jul 2010 13:03:47 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jul 2010 13:03:47 +0000 X-ASF-Spam-Status: No, hits=-1999.4 required=10.0 tests=ALL_TRUSTED,HK_RANDOM_ENVFROM X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jul 2010 13:03:44 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5A24E23889DA; Thu, 22 Jul 2010 13:02:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r966634 - in /qpid/trunk/qpid/java: broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ broker/src/main/java/org/apache/qpid/server/queue/ Date: Thu, 22 Jul 2010 13:02:50 -0000 To: commits@qpid.apache.org From: grkvlt@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100722130250.5A24E23889DA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: grkvlt Date: Thu Jul 22 13:02:49 2010 New Revision: 966634 URL: http://svn.apache.org/viewvc?rev=966634&view=rev Log: QPID-2679: cache queues that are configured on a per-virtualhost basis Added: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Added: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java?rev=966634&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java (added) +++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java Thu Jul 22 13:02:49 2010 @@ -0,0 +1,86 @@ +package org.apache.qpid.server.virtualhost.plugin; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; +import org.apache.qpid.server.exchange.AbstractExchange; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.Exchange.BindingListener; +import org.apache.qpid.server.queue.AMQQueue; + +/** + * This is a listener that caches queues that are configured for slow consumer disconnection. + * + * There should be one listener per virtual host, which can be added to all exchanges on + * that host. + * + * TODO In future, it will be possible to configure the policy at runtime, so only the queue + * itself is cached, and the configuration looked up by the housekeeping thread. This means + * that there may be occasions where the copy of the cache contents retrieved by the thread + * does not contain queues that are configured, or that configured queues are not present. + * + * @see BindingListener + */ +public class ConfiguredQueueBindingListener implements BindingListener +{ + private static final Logger _log = Logger.getLogger(ConfiguredQueueBindingListener.class); + + private String _vhostName; + private Set _cache = Collections.synchronizedSet(new HashSet()); + + public ConfiguredQueueBindingListener(String vhostName) + { + _vhostName = vhostName; + } + + /** + * @see BindingListener#bindingAdded(Exchange, Binding) + */ + public void bindingAdded(Exchange exchange, Binding binding) + { + processBinding(binding); + } + + /** + * @see BindingListener#bindingRemoved(Exchange, Binding) + */ + public void bindingRemoved(Exchange exchange, Binding binding) + { + processBinding(binding); + } + + private void processBinding(Binding binding) + { + AMQQueue queue = binding.getQueue(); + + SlowConsumerDetectionQueueConfiguration config = + queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); + if (config != null) + { + _cache.add(queue); + } + else + { + _cache.remove(queue); + } + } + + /** + * Lookup and return the cache of configured {@link AMQQueue}s. + * + * Note that when accessing the cached queues, the {@link Iterator} is not thread safe + * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the + * cache is returned. + * + * @return a copy of the cached {@link java.util.Set} of queues + */ + public Set getQueueCache() + { + return new HashSet(_cache); + } +} Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java?rev=966634&r1=966633&r2=966634&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java (original) +++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java Thu Jul 22 13:02:49 2010 @@ -1,6 +1,5 @@ /* * - * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -20,12 +19,16 @@ */ package org.apache.qpid.server.virtualhost.plugin; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration; import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.plugins.Plugin; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.plugin.logging.SlowConsumerDetectionMessages; @@ -35,6 +38,7 @@ import org.apache.qpid.server.virtualhos class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { private SlowConsumerDetectionConfiguration _config; + private ConfiguredQueueBindingListener _listener; public static class SlowConsumerFactory implements VirtualHostPluginFactory { @@ -53,9 +57,21 @@ class SlowConsumerDetection extends Virt } } + /** + * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this + * cirtual host to record all the configured queues in a cache for processing by the housekeeping + * thread. + * + * @see Plugin#configure(ConfigurationPlugin) + */ public void configure(ConfigurationPlugin config) { _config = (SlowConsumerDetectionConfiguration) config; + _listener = new ConfiguredQueueBindingListener(_virtualhost.getName()); + for (AMQShortString exchangeName : _virtualhost.getExchangeRegistry().getExchangeNames()) + { + _virtualhost.getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener); + } } public SlowConsumerDetection(VirtualHost vhost) @@ -63,19 +79,19 @@ class SlowConsumerDetection extends Virt super(vhost); } - @Override public void execute() { - SlowConsumerDetectionMessages.RUNNING(); - - for (AMQQueue q : _virtualhost.getQueueRegistry().getQueues()) + CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING()); + + Set cache = _listener.getQueueCache(); + for (AMQQueue q : cache) { - SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName()); + CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName())); + try { SlowConsumerDetectionQueueConfiguration config = - q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); - + q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); if (checkQueueStatus(q, config)) { config.getPolicy().performPolicy(q); @@ -83,15 +99,12 @@ class SlowConsumerDetection extends Virt } catch (Exception e) { - _logger.error("Exception in SlowConsumersDetection " + - "for queue: " + - q.getNameShortString().toString(), e); - //Don't throw exceptions as this will stop the - // house keeping task from running. + // Don't throw exceptions as this will stop the house keeping task from running. + _logger.error("Exception in SlowConsumersDetection for queue: " + q.getName(), e); } } - SlowConsumerDetectionMessages.COMPLETE(); + CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE()); } public long getDelay() Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=966634&r1=966633&r2=966634&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Jul 22 13:02:49 2010 @@ -515,7 +515,12 @@ public class SimpleAMQQueue implements A break; } } - + + reconfigure(); + } + + private void reconfigure() + { //Reconfigure the queue for to reflect this new binding. ConfigurationPlugin config = getVirtualHost().getConfiguration().getQueueConfiguration(this); @@ -539,6 +544,8 @@ public class SimpleAMQQueue implements A public void removeBinding(final Binding binding) { _bindings.remove(binding); + + reconfigure(); } public List getBindings() --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org