Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 50630 invoked from network); 23 Jun 2009 17:32:49 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 23 Jun 2009 17:32:49 -0000 Received: (qmail 64049 invoked by uid 500); 23 Jun 2009 17:33:00 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 63996 invoked by uid 500); 23 Jun 2009 17:33:00 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 63987 invoked by uid 99); 23 Jun 2009 17:33:00 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Jun 2009 17:33:00 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Jun 2009 17:32:56 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4A7A32388891; Tue, 23 Jun 2009 17:32:35 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r787755 - /activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java Date: Tue, 23 Jun 2009 17:32:35 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090623173235.4A7A32388891@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Tue Jun 23 17:32:34 2009 New Revision: 787755 URL: http://svn.apache.org/viewvc?rev=787755&view=rev Log: opps missed this file. Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java?rev=787755&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java (added) +++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java Tue Jun 23 17:32:34 2009 @@ -0,0 +1,82 @@ +package org.apache.activemq.apollo.broker; + +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext; +import org.apache.activemq.apollo.broker.VirtualHost.QueueLifecyleListener; +import org.apache.activemq.apollo.broker.path.PathFilter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class WildcardQueueSubscription implements BrokerSubscription, QueueLifecyleListener { + + static final private Log LOG = LogFactory.getLog(WildcardQueueSubscription.class); + + private final VirtualHost host; + private final Destination destination; + private final ConsumerContext consumer; + private final PathFilter filter; + + private final ArrayList childSubs = new ArrayList(); + + public WildcardQueueSubscription(VirtualHost host, Destination destination, ConsumerContext consumer) { + this.host = host; + this.destination = destination; + this.consumer = consumer; + filter = PathFilter.parseFilter(destination.getName()); + } + + /////////////////////////////////////////////////////////////////// + // BrokerSubscription interface implementation + /////////////////////////////////////////////////////////////////// + public void connect(ConsumerContext cc) throws Exception { + assert cc == consumer; + synchronized(host) { + Domain domain = host.getRouter().getDomain(Router.QUEUE_DOMAIN); + Collection matches = domain.route(destination.getName(), null); + for (DeliveryTarget target : matches) { + Queue queue = (Queue) target; + BrokerSubscription childSub = host.createSubscription(consumer, queue.getDestination()); + childSubs.add(childSub); + childSub.connect(consumer); + } + host.addDestinationLifecyleListener(this); + } + } + + public void disconnect(ConsumerContext cc) { + assert cc == consumer; + synchronized(host) { + host.removeDestinationLifecyleListener(this); + for (BrokerSubscription childSub : childSubs) { + childSub.disconnect(cc); + } + childSubs.clear(); + } + } + + public Destination getDestination() { + return destination; + } + + /////////////////////////////////////////////////////////////////// + // QueueLifecyleListener interface implementation + /////////////////////////////////////////////////////////////////// + public void onCreate(Queue queue) { + if( filter.matches(queue.getDestination().getName()) ) { + try { + BrokerSubscription childSub = host.createSubscription(consumer, queue.getDestination()); + childSubs.add(childSub); + childSub.connect(consumer); + } catch (Exception e) { + LOG.warn("Could not create dynamic subscription to "+queue.getDestination()+": "+e); + LOG.debug("Could not create dynamic subscription to "+queue.getDestination()+": ", e); + } + } + } + + public void onDestroy(Queue queue) { + } + +}