activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r509563 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors: QueueStorePrefetch.java StoreDurableSubscriberCursor.java TopicStorePrefetch.java
Date Tue, 20 Feb 2007 13:59:42 GMT
Author: rajdavies
Date: Tue Feb 20 05:59:41 2007
New Revision: 509563

URL: http://svn.apache.org/viewvc?view=rev&rev=509563
Log:
tighten up on message reference counting

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?view=diff&rev=509563&r1=509562&r2=509563
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Tue Feb 20 05:59:41 2007
@@ -42,7 +42,7 @@
     static private final Log log=LogFactory.getLog(QueueStorePrefetch.class);
    
     private MessageStore store;
-    private final LinkedList batchList=new LinkedList();
+    private final LinkedList <Message>batchList=new LinkedList<Message>();
     private Destination regionDestination;
     private int size = 0;
 
@@ -123,7 +123,7 @@
     }
 
     public synchronized MessageReference next(){
-        Message result = (Message)batchList.removeFirst();
+        Message result = batchList.removeFirst();
         result.setRegionDestination(regionDestination);
         return result;
     }
@@ -137,7 +137,10 @@
 
     public void recoverMessage(Message message) throws Exception{
         message.setRegionDestination(regionDestination);
-        message.incrementReferenceCount();
+        // only increment if count is zero (could have been cached)
+        if(message.getReferenceCount()==0){
+            message.incrementReferenceCount();
+        }
         batchList.addLast(message);
     }
 
@@ -153,6 +156,9 @@
     }
     
     public void gc() {
+        for (Message msg:batchList) {
+            msg.decrementReferenceCount();
+        }
         batchList.clear();
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=509563&r1=509562&r2=509563
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Tue Feb 20 05:59:41 2007
@@ -42,7 +42,7 @@
     private String clientId;
     private String subscriberName;
     private Map topics=new HashMap();
-    private LinkedList storePrefetches=new LinkedList();
+    private LinkedList <PendingMessageCursor>storePrefetches=new LinkedList<PendingMessageCursor>();
     private boolean started;
     private PendingMessageCursor nonPersistent;
     private PendingMessageCursor currentCursor;
@@ -61,21 +61,24 @@
     }
 
     public synchronized void start() throws Exception{
-        started=true;
-        for(Iterator i=storePrefetches.iterator();i.hasNext();){
-            PendingMessageCursor tsp=(PendingMessageCursor)i.next();
-            tsp.start();
-            pendingCount+=tsp.size();
+        if(!started){
+            started=true;
+            for(PendingMessageCursor tsp: storePrefetches){
+                tsp.start();
+                pendingCount+=tsp.size();
+            }
         }
     }
 
     public synchronized void stop() throws Exception{
-        started=false;
-        for(Iterator i=storePrefetches.iterator();i.hasNext();){
-            PendingMessageCursor tsp=(PendingMessageCursor)i.next();
-            tsp.stop();
+        if(started){
+            started=false;
+            for(PendingMessageCursor tsp: storePrefetches){
+                tsp.stop();
+            }
+           
+            pendingCount=0;
         }
-        pendingCount=0;
     }
 
     /**
@@ -119,12 +122,12 @@
     public synchronized boolean isEmpty(){
         return pendingCount<=0;
     }
-    
-    public boolean isEmpty(Destination destination) {
-        boolean result = true;
+
+    public boolean isEmpty(Destination destination){
+        boolean result=true;
         TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(destination);
         if(tsp!=null){
-            result = tsp.size() <= 0;
+            result=tsp.size()<=0;
         }
         return result;
     }
@@ -140,7 +143,6 @@
         return false;
     }
 
-    
     public synchronized void addMessageLast(MessageReference node) throws Exception{
         if(node!=null){
             Message msg=node.getMessage();
@@ -159,7 +161,7 @@
             }
         }
     }
-    
+
     public void addRecoveredMessage(MessageReference node) throws Exception{
         nonPersistent.addMessageLast(node);
     }
@@ -178,12 +180,13 @@
                 throw new RuntimeException(e);
             }
             result=currentCursor!=null?currentCursor.hasNext():false;
-        }
+        }     
         return result;
     }
 
     public synchronized MessageReference next(){
-        return currentCursor!=null?currentCursor.next():null;
+        MessageReference result =  currentCursor!=null?currentCursor.next():null;
+        return result;
     }
 
     public synchronized void remove(){
@@ -192,7 +195,7 @@
         }
         pendingCount--;
     }
-    
+
     public void remove(MessageReference node){
         if(currentCursor!=null){
             currentCursor.remove(node);
@@ -206,7 +209,7 @@
             tsp.reset();
         }
     }
-    
+
     public synchronized void release(){
         for(Iterator i=storePrefetches.iterator();i.hasNext();){
             AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
@@ -217,7 +220,7 @@
     public int size(){
         return pendingCount;
     }
-    
+
     public synchronized void setMaxBatchSize(int maxBatchSize){
         for(Iterator i=storePrefetches.iterator();i.hasNext();){
             AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
@@ -225,14 +228,14 @@
         }
         super.setMaxBatchSize(maxBatchSize);
     }
-    
-    public synchronized void gc() {
+
+    public synchronized void gc(){
         for(Iterator i=storePrefetches.iterator();i.hasNext();){
             PendingMessageCursor tsp=(PendingMessageCursor)i.next();
             tsp.gc();
         }
     }
-    
+
     public synchronized void setUsageManager(UsageManager usageManager){
         super.setUsageManager(usageManager);
         for(Iterator i=storePrefetches.iterator();i.hasNext();){
@@ -245,7 +248,7 @@
         if(currentCursor==null||currentCursor.isEmpty()){
             currentCursor=null;
             for(Iterator i=storePrefetches.iterator();i.hasNext();){
-                AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();

+                AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
                 if(tsp.hasNext()){
                     currentCursor=tsp;
                     break;
@@ -255,5 +258,9 @@
             storePrefetches.addLast(storePrefetches.removeFirst());
         }
         return currentCursor;
+    }
+
+    public String toString(){
+        return "StoreDurableSubscriber("+clientId+":"+subscriberName+")";
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=509563&r1=509562&r2=509563
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
Tue Feb 20 05:59:41 2007
@@ -1,26 +1,21 @@
 /**
  * 
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
  * 
  * http://www.apache.org/licenses/LICENSE-2.0
  * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
 import java.util.LinkedList;
-
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Topic;
@@ -32,25 +27,21 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * perist pending messages pending message (messages awaiting disptach to a
- * consumer) cursor
+ * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
  * 
  * @version $Revision$
  */
-class TopicStorePrefetch extends AbstractPendingMessageCursor implements
-        MessageRecoveryListener {
+class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener{
 
     static private final Log log=LogFactory.getLog(TopicStorePrefetch.class);
-   
     private TopicMessageStore store;
-    private final LinkedList batchList=new LinkedList();
+    private final LinkedList<Message> batchList=new LinkedList<Message>();
     private String clientId;
     private String subscriberName;
     private Destination regionDestination;
-
     boolean empty;
-	private MessageId firstMessageId;
-	private MessageId lastMessageId;
+    private MessageId firstMessageId;
+    private MessageId lastMessageId;
 
     /**
      * @param topic
@@ -59,13 +50,13 @@
      * @throws IOException
      */
     public TopicStorePrefetch(Topic topic,String clientId,String subscriberName){
-        this.regionDestination = topic;
+        this.regionDestination=topic;
         this.store=(TopicMessageStore)topic.getMessageStore();
         this.clientId=clientId;
         this.subscriberName=subscriberName;
     }
 
-    public void start() throws Exception {
+    public void start() throws Exception{
         if(batchList.isEmpty()){
             try{
                 fillBatch();
@@ -73,8 +64,8 @@
                 log.error("Failed to fill batch",e);
                 throw new RuntimeException(e);
             }
-            empty = batchList.isEmpty();
-        }    	
+            empty=batchList.isEmpty();
+        }
     }
 
     public void stop() throws Exception{
@@ -88,66 +79,63 @@
     public boolean isEmpty(){
         return empty;
     }
-    
+
     public synchronized int size(){
         try{
             return store.getMessageCount(clientId,subscriberName);
         }catch(IOException e){
-            log.error(this + " Failed to get the outstanding message count from the store",e);
+            log.error(this+" Failed to get the outstanding message count from the store",e);
             throw new RuntimeException(e);
         }
     }
-    
+
     public synchronized void addMessageLast(MessageReference node) throws Exception{
-		if(node!=null){
-			if( empty ) {
-				firstMessageId = node.getMessageId();
-				empty=false;
-			}
-	        lastMessageId = node.getMessageId();
+        if(node!=null){
+            if(empty){
+                firstMessageId=node.getMessageId();
+                empty=false;
+            }
+            lastMessageId=node.getMessageId();
             node.decrementReferenceCount();
         }
     }
 
-    public synchronized boolean hasNext() {
+    public synchronized boolean hasNext(){
         return !isEmpty();
     }
 
     public synchronized MessageReference next(){
-    	    	
-        if( empty ) {
-        	return null;
-        } else {
-
-        	// We may need to fill in the batch...
+        Message result=null;
+        if(!empty){
             if(batchList.isEmpty()){
                 try{
                     fillBatch();
-                }catch(Exception e){
+                }catch(final Exception e){
                     log.error("Failed to fill batch",e);
                     throw new RuntimeException(e);
                 }
-                if( batchList.isEmpty()) {
-                	return null;
+                if(batchList.isEmpty()){
+                    return null;
+                }
+            }
+            if(!batchList.isEmpty()){
+                result=batchList.removeFirst();
+                if(firstMessageId!=null){
+                    // Skip messages until we get to the first message.
+                    if(!result.getMessageId().equals(firstMessageId))
+                        result=null;
+                    firstMessageId=null;
+                }else{
+                    if(lastMessageId!=null){
+                        if(result.getMessageId().equals(lastMessageId)){
+                            empty=true;
+                        }
+                    }
+                    result.setRegionDestination(regionDestination);
                 }
             }
-
-            Message result = (Message)batchList.removeFirst();
-        	
-        	if( firstMessageId != null ) {
-            	// Skip messages until we get to the first message.
-        		if( !result.getMessageId().equals(firstMessageId) ) 
-        			return null;
-        		firstMessageId = null;
-        	}
-        	if( lastMessageId != null ) {
-        		if( result.getMessageId().equals(lastMessageId) ) {
-        			empty=true;
-        		}
-        	}        	
-            result.setRegionDestination(regionDestination);
-            return result;
         }
+        return result;
     }
 
     public void reset(){
@@ -159,12 +147,14 @@
 
     public void recoverMessage(Message message) throws Exception{
         message.setRegionDestination(regionDestination);
-        message.incrementReferenceCount();
+        // only increment if count is zero (could have been cached)
+        if(message.getReferenceCount()==0){
+            message.incrementReferenceCount();
+        }
         batchList.addLast(message);
     }
 
-    public void recoverMessageReference(MessageId messageReference)
-            throws Exception{
+    public void recoverMessageReference(MessageId messageReference) throws Exception{
         // shouldn't get called
         throw new RuntimeException("Not supported");
     }
@@ -173,13 +163,15 @@
     protected void fillBatch() throws Exception{
         store.recoverNextMessages(clientId,subscriberName,maxBatchSize,this);
     }
-    
-    public void gc() {
+
+    public void gc(){
+        for(Message msg:batchList){
+            msg.decrementReferenceCount();
+        }
         batchList.clear();
     }
-    
-    public String toString() {
-        return "TopicStorePrefetch" + System.identityHashCode(this) + "("+clientId+","+subscriberName+")";
+
+    public String toString(){
+        return "TopicStorePrefetch"+System.identityHashCode(this)+"("+clientId+","+subscriberName+")";
     }
-    
 }



Mime
View raw message