activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r509552 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: DurableTopicSubscription.java TopicRegion.java
Date Tue, 20 Feb 2007 13:22:28 GMT
Author: rajdavies
Date: Tue Feb 20 05:22:27 2007
New Revision: 509552

URL: http://svn.apache.org/viewvc?view=rev&rev=509552
Log:
try to deliver messages if there's enough space

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=509552&r1=509551&r2=509552
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Tue Feb 20 05:22:27 2007
@@ -1,20 +1,17 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
@@ -23,92 +20,98 @@
 import javax.jms.InvalidSelectorException;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.memory.UsageListener;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class DurableTopicSubscription extends PrefetchSubscription {
+public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener{
+
     static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
-    private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
-    private final ConcurrentHashMap destinations = new ConcurrentHashMap();
+    private final ConcurrentHashMap redeliveredMessages=new ConcurrentHashMap();
+    private final ConcurrentHashMap destinations=new ConcurrentHashMap();
     private final SubscriptionKey subscriptionKey;
     private final boolean keepDurableSubsActive;
+    private final UsageManager usageManager;
     private boolean active=false;
-    
-    public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo
info, boolean keepDurableSubsActive) throws InvalidSelectorException {
-        super(broker,context,info,new StoreDurableSubscriberCursor(context.getClientId(),info.getSubscriptionName(),broker.getTempDataStore(),info.getPrefetchSize()));
-        this.keepDurableSubsActive = keepDurableSubsActive;
-        subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
+
+    public DurableTopicSubscription(Broker broker,UsageManager usageManager,ConnectionContext
context,
+            ConsumerInfo info,boolean keepDurableSubsActive) throws InvalidSelectorException{
+        super(broker,context,info,new StoreDurableSubscriberCursor(context.getClientId(),info.getSubscriptionName(),
+                broker.getTempDataStore(),info.getPrefetchSize()));
+        this.usageManager=usageManager;
+        this.keepDurableSubsActive=keepDurableSubsActive;
+        subscriptionKey=new SubscriptionKey(context.getClientId(),info.getSubscriptionName());
     }
-    
-    synchronized public boolean isActive() {
+
+    synchronized public boolean isActive(){
         return active;
     }
-    
-    protected boolean isFull() {
-        return !active || super.isFull();
-    }
-    
-    synchronized public void gc() {
-    }
-
-    public synchronized void add(ConnectionContext context, Destination destination) throws
Exception {
-        super.add(context, destination);
-        destinations.put(destination.getActiveMQDestination(), destination);
-        if( active || keepDurableSubsActive ) {
-            Topic topic = (Topic) destination;            
-            topic.activate(context, this);
-            if (pending.isEmpty(topic)) {
-                topic.recoverRetroactiveMessages(context, this);
+
+    protected boolean isFull(){
+        return !active||super.isFull();
+    }
+
+    synchronized public void gc(){
+    }
+
+    public synchronized void add(ConnectionContext context,Destination destination) throws
Exception{
+        super.add(context,destination);
+        destinations.put(destination.getActiveMQDestination(),destination);
+        if(active||keepDurableSubsActive){
+            Topic topic=(Topic)destination;
+            topic.activate(context,this);
+            if(pending.isEmpty(topic)){
+                topic.recoverRetroactiveMessages(context,this);
             }
         }
         dispatchMatched();
     }
-   
-    public void activate(UsageManager memoryManager,ConnectionContext context, ConsumerInfo
info) throws Exception {
-        log.debug("Deactivating " + this);
-        if( !active ) {
-            this.active = true;
-            this.context = context;
-            this.info = info;
-            if( !keepDurableSubsActive ) {
-                for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
-                    Topic topic = (Topic) iter.next();
-                    topic.activate(context, this);
+
+    public void activate(UsageManager memoryManager,ConnectionContext context,ConsumerInfo
info) throws Exception{
+        log.debug("Deactivating "+this);
+        if(!active){
+            this.active=true;
+            this.context=context;
+            this.info=info;
+            if(!keepDurableSubsActive){
+                for(Iterator iter=destinations.values().iterator();iter.hasNext();){
+                    Topic topic=(Topic)iter.next();
+                    topic.activate(context,this);
                 }
             }
-            synchronized(pending) {
+            synchronized(pending){
                 pending.setUsageManager(memoryManager);
                 pending.start();
             }
-            //If nothing was in the persistent store, then try to use the recovery policy.
-            if (pending.isEmpty()) {
-                for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
-                    Topic topic = (Topic) iter.next();
-                    topic.recoverRetroactiveMessages(context, this);
+            // If nothing was in the persistent store, then try to use the recovery policy.
+            if(pending.isEmpty()){
+                for(Iterator iter=destinations.values().iterator();iter.hasNext();){
+                    Topic topic=(Topic)iter.next();
+                    topic.recoverRetroactiveMessages(context,this);
                 }
             }
             dispatchMatched();
+            this.usageManager.addUsageListener(this);
         }
     }
 
-    synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {
  
-   
+    synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception{
         active=false;
+        this.usageManager.removeUsageListener(this);
         synchronized(pending){
             pending.stop();
         }
-        if( !keepDurableSubsActive ) {
-            for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
-                Topic topic = (Topic) iter.next();
-                topic.deactivate(context, this);
+        if(!keepDurableSubsActive){
+            for(Iterator iter=destinations.values().iterator();iter.hasNext();){
+                Topic topic=(Topic)iter.next();
+                topic.deactivate(context,this);
             }
         }
         synchronized(dispatched){
@@ -131,7 +134,6 @@
                 iter.remove();
             }
         }
-        
         if(!keepDurableSubsActive){
             synchronized(pending){
                 try{
@@ -149,74 +151,68 @@
         prefetchExtension=0;
     }
 
-    protected MessageDispatch createMessageDispatch(MessageReference node, Message message)
{
-        MessageDispatch md = super.createMessageDispatch(node, message);
-        Integer count = (Integer) redeliveredMessages.get(node.getMessageId());
-        if( count !=null ) {
+    protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
+        MessageDispatch md=super.createMessageDispatch(node,message);
+        Integer count=(Integer)redeliveredMessages.get(node.getMessageId());
+        if(count!=null){
             md.setRedeliveryCounter(count.intValue());
         }
         return md;
     }
 
-    public void add(MessageReference node) throws Exception {
-        if( !active && !keepDurableSubsActive ) {
+    public void add(MessageReference node) throws Exception{
+        if(!active&&!keepDurableSubsActive){
             return;
         }
         node.incrementReferenceCount();
         super.add(node);
     }
-    
-    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
+
+    protected void doAddRecoveredMessage(MessageReference message) throws Exception{
         pending.addRecoveredMessage(message);
     }
-    
-    public int getPendingQueueSize() {
-        if( active || keepDurableSubsActive ) {
+
+    public int getPendingQueueSize(){
+        if(active||keepDurableSubsActive){
             return super.getPendingQueueSize();
         }
-        //TODO: need to get from store
+        // TODO: need to get from store
         return 0;
     }
-   
-    public void setSelector(String selector) throws InvalidSelectorException {
-        throw new UnsupportedOperationException("You cannot dynamically change the selector
for durable topic subscriptions");
+
+    public void setSelector(String selector) throws InvalidSelectorException{
+        throw new UnsupportedOperationException(
+                "You cannot dynamically change the selector for durable topic subscriptions");
     }
 
-    protected boolean canDispatch(MessageReference node) {
+    protected boolean canDispatch(MessageReference node){
         return active;
     }
-    
-    protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference
node) throws IOException {
-        node.getRegionDestination().acknowledge(context, this, ack, node);
+
+    protected void acknowledge(ConnectionContext context,MessageAck ack,MessageReference
node) throws IOException{
+        node.getRegionDestination().acknowledge(context,this,ack,node);
         redeliveredMessages.remove(node.getMessageId());
         node.decrementReferenceCount();
     }
-    
-    public String getSubscriptionName() {
+
+    public String getSubscriptionName(){
         return subscriptionKey.getSubscriptionName();
     }
-    
-    public String toString() {
-        return 
-            "DurableTopicSubscription:" +
-            " consumer="+info.getConsumerId()+
-            ", destinations="+destinations.size()+
-            ", total="+enqueueCounter+
-            ", pending="+getPendingQueueSize()+
-            ", dispatched="+dispatchCounter+
-            ", inflight="+dispatched.size()+
-            ", prefetchExtension="+this.prefetchExtension;
-            
+
+    public String toString(){
+        return "DurableTopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
+                +", total="+enqueueCounter+", pending="+getPendingQueueSize()+", dispatched="+dispatchCounter
+                +", inflight="+dispatched.size()+", prefetchExtension="+this.prefetchExtension;
     }
 
-    public String getClientId() {
+    public String getClientId(){
         return subscriptionKey.getClientId();
     }
 
-    public SubscriptionKey getSubscriptionKey() {
+    public SubscriptionKey getSubscriptionKey(){
         return subscriptionKey;
     }
-    
+
     /**
      * Release any references that we are holding.
      */
@@ -239,7 +235,21 @@
         }
         dispatched.clear();
     }
-    
-    
 
+    /**
+     * @param memoryManager
+     * @param oldPercentUsage
+     * @param newPercentUsage
+     * @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager,
int,
+     *      int)
+     */
+    public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
+        if(oldPercentUsage>newPercentUsage&&oldPercentUsage>=90){
+            try{
+                dispatchMatched();
+            }catch(IOException e){
+                log.warn("problem calling dispatchMatched",e);
+            }
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=509552&r1=509551&r2=509552
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Tue Feb 20 05:22:27 2007
@@ -216,7 +216,7 @@
             SubscriptionKey key=new SubscriptionKey(context.getClientId(),info.getSubscriptionName());
             DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key);
             if(sub==null){
-                sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive);
+                sub=new DurableTopicSubscription(broker,memoryManager,context,info,keepDurableSubsActive);
                 ActiveMQDestination destination=info.getDestination();
                 if(destination!=null&&broker.getDestinationPolicy()!=null){
                     PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);



Mime
View raw message