Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 63835 invoked from network); 20 Feb 2007 13:22:50 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Feb 2007 13:22:50 -0000 Received: (qmail 15454 invoked by uid 500); 20 Feb 2007 13:22:58 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 15426 invoked by uid 500); 20 Feb 2007 13:22:58 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 15416 invoked by uid 99); 20 Feb 2007 13:22:57 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Feb 2007 05:22:57 -0800 X-ASF-Spam-Status: No, hits=-98.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Feb 2007 05:22:48 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 8F6171A981D; Tue, 20 Feb 2007 05:22:28 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r509552 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: DurableTopicSubscription.java TopicRegion.java Date: Tue, 20 Feb 2007 13:22:28 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070220132228.8F6171A981D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Tue Feb 20 05:22:27 2007 New Revision: 509552 URL: http://svn.apache.org/viewvc?view=rev&rev=509552 Log: try to deliver messages if there's enough space Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=509552&r1=509551&r2=509552 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Feb 20 05:22:27 2007 @@ -1,20 +1,17 @@ /** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ + package org.apache.activemq.broker.region; import java.io.IOException; @@ -23,92 +20,98 @@ import javax.jms.InvalidSelectorException; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.memory.UsageListener; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -public class DurableTopicSubscription extends PrefetchSubscription { +public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener{ + static private final Log log=LogFactory.getLog(PrefetchSubscription.class); - private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap(); - private final ConcurrentHashMap destinations = new ConcurrentHashMap(); + private final ConcurrentHashMap redeliveredMessages=new ConcurrentHashMap(); + private final ConcurrentHashMap destinations=new ConcurrentHashMap(); private final SubscriptionKey subscriptionKey; private final boolean keepDurableSubsActive; + private final UsageManager usageManager; private boolean active=false; - - public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException { - super(broker,context,info,new StoreDurableSubscriberCursor(context.getClientId(),info.getSubscriptionName(),broker.getTempDataStore(),info.getPrefetchSize())); - this.keepDurableSubsActive = keepDurableSubsActive; - subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); + + public DurableTopicSubscription(Broker broker,UsageManager usageManager,ConnectionContext context, + ConsumerInfo info,boolean keepDurableSubsActive) throws InvalidSelectorException{ + super(broker,context,info,new StoreDurableSubscriberCursor(context.getClientId(),info.getSubscriptionName(), + broker.getTempDataStore(),info.getPrefetchSize())); + this.usageManager=usageManager; + this.keepDurableSubsActive=keepDurableSubsActive; + subscriptionKey=new SubscriptionKey(context.getClientId(),info.getSubscriptionName()); } - - synchronized public boolean isActive() { + + synchronized public boolean isActive(){ return active; } - - protected boolean isFull() { - return !active || super.isFull(); - } - - synchronized public void gc() { - } - - public synchronized void add(ConnectionContext context, Destination destination) throws Exception { - super.add(context, destination); - destinations.put(destination.getActiveMQDestination(), destination); - if( active || keepDurableSubsActive ) { - Topic topic = (Topic) destination; - topic.activate(context, this); - if (pending.isEmpty(topic)) { - topic.recoverRetroactiveMessages(context, this); + + protected boolean isFull(){ + return !active||super.isFull(); + } + + synchronized public void gc(){ + } + + public synchronized void add(ConnectionContext context,Destination destination) throws Exception{ + super.add(context,destination); + destinations.put(destination.getActiveMQDestination(),destination); + if(active||keepDurableSubsActive){ + Topic topic=(Topic)destination; + topic.activate(context,this); + if(pending.isEmpty(topic)){ + topic.recoverRetroactiveMessages(context,this); } } dispatchMatched(); } - - public void activate(UsageManager memoryManager,ConnectionContext context, ConsumerInfo info) throws Exception { - log.debug("Deactivating " + this); - if( !active ) { - this.active = true; - this.context = context; - this.info = info; - if( !keepDurableSubsActive ) { - for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { - Topic topic = (Topic) iter.next(); - topic.activate(context, this); + + public void activate(UsageManager memoryManager,ConnectionContext context,ConsumerInfo info) throws Exception{ + log.debug("Deactivating "+this); + if(!active){ + this.active=true; + this.context=context; + this.info=info; + if(!keepDurableSubsActive){ + for(Iterator iter=destinations.values().iterator();iter.hasNext();){ + Topic topic=(Topic)iter.next(); + topic.activate(context,this); } } - synchronized(pending) { + synchronized(pending){ pending.setUsageManager(memoryManager); pending.start(); } - //If nothing was in the persistent store, then try to use the recovery policy. - if (pending.isEmpty()) { - for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { - Topic topic = (Topic) iter.next(); - topic.recoverRetroactiveMessages(context, this); + // If nothing was in the persistent store, then try to use the recovery policy. + if(pending.isEmpty()){ + for(Iterator iter=destinations.values().iterator();iter.hasNext();){ + Topic topic=(Topic)iter.next(); + topic.recoverRetroactiveMessages(context,this); } } dispatchMatched(); + this.usageManager.addUsageListener(this); } } - synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception { - + synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception{ active=false; + this.usageManager.removeUsageListener(this); synchronized(pending){ pending.stop(); } - if( !keepDurableSubsActive ) { - for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { - Topic topic = (Topic) iter.next(); - topic.deactivate(context, this); + if(!keepDurableSubsActive){ + for(Iterator iter=destinations.values().iterator();iter.hasNext();){ + Topic topic=(Topic)iter.next(); + topic.deactivate(context,this); } } synchronized(dispatched){ @@ -131,7 +134,6 @@ iter.remove(); } } - if(!keepDurableSubsActive){ synchronized(pending){ try{ @@ -149,74 +151,68 @@ prefetchExtension=0; } - protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { - MessageDispatch md = super.createMessageDispatch(node, message); - Integer count = (Integer) redeliveredMessages.get(node.getMessageId()); - if( count !=null ) { + protected MessageDispatch createMessageDispatch(MessageReference node,Message message){ + MessageDispatch md=super.createMessageDispatch(node,message); + Integer count=(Integer)redeliveredMessages.get(node.getMessageId()); + if(count!=null){ md.setRedeliveryCounter(count.intValue()); } return md; } - public void add(MessageReference node) throws Exception { - if( !active && !keepDurableSubsActive ) { + public void add(MessageReference node) throws Exception{ + if(!active&&!keepDurableSubsActive){ return; } node.incrementReferenceCount(); super.add(node); } - - protected void doAddRecoveredMessage(MessageReference message) throws Exception { + + protected void doAddRecoveredMessage(MessageReference message) throws Exception{ pending.addRecoveredMessage(message); } - - public int getPendingQueueSize() { - if( active || keepDurableSubsActive ) { + + public int getPendingQueueSize(){ + if(active||keepDurableSubsActive){ return super.getPendingQueueSize(); } - //TODO: need to get from store + // TODO: need to get from store return 0; } - - public void setSelector(String selector) throws InvalidSelectorException { - throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); + + public void setSelector(String selector) throws InvalidSelectorException{ + throw new UnsupportedOperationException( + "You cannot dynamically change the selector for durable topic subscriptions"); } - protected boolean canDispatch(MessageReference node) { + protected boolean canDispatch(MessageReference node){ return active; } - - protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { - node.getRegionDestination().acknowledge(context, this, ack, node); + + protected void acknowledge(ConnectionContext context,MessageAck ack,MessageReference node) throws IOException{ + node.getRegionDestination().acknowledge(context,this,ack,node); redeliveredMessages.remove(node.getMessageId()); node.decrementReferenceCount(); } - - public String getSubscriptionName() { + + public String getSubscriptionName(){ return subscriptionKey.getSubscriptionName(); } - - public String toString() { - return - "DurableTopicSubscription:" + - " consumer="+info.getConsumerId()+ - ", destinations="+destinations.size()+ - ", total="+enqueueCounter+ - ", pending="+getPendingQueueSize()+ - ", dispatched="+dispatchCounter+ - ", inflight="+dispatched.size()+ - ", prefetchExtension="+this.prefetchExtension; - + + public String toString(){ + return "DurableTopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size() + +", total="+enqueueCounter+", pending="+getPendingQueueSize()+", dispatched="+dispatchCounter + +", inflight="+dispatched.size()+", prefetchExtension="+this.prefetchExtension; } - public String getClientId() { + public String getClientId(){ return subscriptionKey.getClientId(); } - public SubscriptionKey getSubscriptionKey() { + public SubscriptionKey getSubscriptionKey(){ return subscriptionKey; } - + /** * Release any references that we are holding. */ @@ -239,7 +235,21 @@ } dispatched.clear(); } - - + /** + * @param memoryManager + * @param oldPercentUsage + * @param newPercentUsage + * @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager, int, + * int) + */ + public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){ + if(oldPercentUsage>newPercentUsage&&oldPercentUsage>=90){ + try{ + dispatchMatched(); + }catch(IOException e){ + log.warn("problem calling dispatchMatched",e); + } + } + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=509552&r1=509551&r2=509552 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Tue Feb 20 05:22:27 2007 @@ -216,7 +216,7 @@ SubscriptionKey key=new SubscriptionKey(context.getClientId(),info.getSubscriptionName()); DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key); if(sub==null){ - sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive); + sub=new DurableTopicSubscription(broker,memoryManager,context,info,keepDurableSubsActive); ActiveMQDestination destination=info.getDestination(); if(destination!=null&&broker.getDestinationPolicy()!=null){ PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);