activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r479094 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: TopicSubscription.java cursors/AbstractPendingMessageCursor.java cursors/FilePendingMessageCursor.java cursors/PendingMessageCursor.java
Date Sat, 25 Nov 2006 07:10:18 GMT
Author: rajdavies
Date: Fri Nov 24 23:10:17 2006
New Revision: 479094

URL: http://svn.apache.org/viewvc?view=rev&rev=479094
Log:
Update to fix http://issues.apache.org/activemq/browse/AMQ-791
Use in mmeory list in FilePendingMessageCursor - until memory limit reached - then use disk.
USe FilePendingMessageCursor in TopicSubscription instead of LinkedList

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=479094&r1=479093&r2=479094
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Fri Nov 24 23:10:17 2006
@@ -26,6 +26,7 @@
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -50,7 +51,7 @@
 	
     private static final Log log=LogFactory.getLog(TopicSubscription.class);
     
-    final protected LinkedList matched=new LinkedList();
+    final protected FilePendingMessageCursor matched;
     final protected ActiveMQDestination dlqDestination=new ActiveMQQueue("ActiveMQ.DLQ");
     final protected UsageManager usageManager;
     protected AtomicLong dispatched=new AtomicLong();
@@ -69,6 +70,7 @@
                     throws InvalidSelectorException{
         super(broker,context,info);
         this.usageManager=usageManager;
+        this.matched = new FilePendingMessageCursor(info.getConsumerId().toString(), broker.getTempDataStore());
     }
 
     public void add(MessageReference node) throws InterruptedException,IOException{
@@ -84,7 +86,7 @@
         }else{
             if(maximumPendingMessages!=0){
                 synchronized(matchedListMutex){
-                    matched.addLast(node);
+                    matched.addMessageLast(node);
                     // NOTE - be careful about the slaveBroker!
                     if (maximumPendingMessages > 0) {
                         
@@ -94,15 +96,22 @@
                             max = maximumPendingMessages;
                         }
                         if (!matched.isEmpty() && matched.size() > max) {
-                            removeExpiredMessages(matched);
+                            removeExpiredMessages();
                         }
 
                         // lets discard old messages as we are a slow consumer
                         while (!matched.isEmpty() && matched.size() > maximumPendingMessages)
{
-                            MessageReference[] oldMessages = messageEvictionStrategy.evictMessages(matched);
+                            int pageInSize = matched.size() - maximumPendingMessages;
+                            //only page in a 1000 at a time - else we could blow da memory
+                            pageInSize = Math.max(1000,pageInSize);
+                            LinkedList list = matched.pageInList(pageInSize);
+                            MessageReference[] oldMessages = messageEvictionStrategy.evictMessages(list);
                             int messagesToEvict = oldMessages.length;
                             for(int i = 0; i < messagesToEvict; i++) {
-                            	oldMessages[i].decrementReferenceCount();
+                                MessageReference oldMessage = oldMessages[i];
+                            	oldMessage.decrementReferenceCount();
+                                matched.remove(oldMessage);
+                                
                                 discarded++;
                                 if (log.isDebugEnabled()) {
                                     log.debug("Discarding message " + oldMessages[i]);
@@ -126,29 +135,33 @@
      * Discard any expired messages from the matched list. Called from a synchronized block.
      * @throws IOException 
      */
-    protected void removeExpiredMessages(LinkedList messages) throws IOException {
-        for(Iterator i=matched.iterator();i.hasNext();){
-            MessageReference node=(MessageReference) i.next();
+    protected void removeExpiredMessages() throws IOException {
+        matched.reset();
+        while(matched.hasNext()) {
+            MessageReference node=matched.next();
             if (node.isExpired()) {
-                i.remove();
+                matched.remove();
                 dispatched.incrementAndGet();
                 node.decrementReferenceCount();
                 break;
             }
         }
+        matched.release();
     }
 
     public void processMessageDispatchNotification(MessageDispatchNotification mdn){
         synchronized(matchedListMutex){
-            for(Iterator i=matched.iterator();i.hasNext();){
-                MessageReference node=(MessageReference) i.next();
+            matched.reset();
+            while(matched.hasNext()) {
+                MessageReference node=matched.next();
                 if(node.getMessageId().equals(mdn.getMessageId())){
-                    i.remove();
+                    matched.remove();
                     dispatched.incrementAndGet();
                     node.decrementReferenceCount();
                     break;
                 }
             }
+            matched.release();
         }
     }
 
@@ -322,9 +335,10 @@
 
     private void dispatchMatched() throws IOException{
         synchronized(matchedListMutex){
-            for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){
-                MessageReference message=(MessageReference) iter.next();
-                iter.remove();
+            matched.reset();
+            while(matched.hasNext()) {
+                MessageReference message=(MessageReference) matched.next();
+                matched.remove();
                 
                 // Message may have been sitting in the matched list a while
                 // waiting for the consumer to ak the message.
@@ -335,6 +349,7 @@
 
                 dispatch(message);
             }
+            matched.release();
         }
     }
 
@@ -380,11 +395,7 @@
 
     public void destroy() {
         synchronized(matchedListMutex){
-            for (Iterator iter = matched.iterator(); iter.hasNext();) {
-                MessageReference node = (MessageReference) iter.next();
-                node.decrementReferenceCount();
-            }
-            matched.clear();
+            matched.destroy();
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=479094&r1=479093&r2=479094
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Fri Nov 24 23:10:17 2006
@@ -106,4 +106,8 @@
     public boolean hasSpace() {
         return usageManager != null ? !usageManager.isFull() : true;
     }
+    
+    public boolean isFull() {
+        return usageManager != null ? usageManager.isFull() : false;
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=479094&r1=479093&r2=479094
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Fri Nov 24 23:10:17 2006
@@ -11,47 +11,59 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.Store;
+import org.apache.activemq.memory.UsageListener;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.kahadaptor.CommandMarshaller;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /**
  * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
- *
+ * 
  * @version $Revision$
  */
-public class FilePendingMessageCursor extends AbstractPendingMessageCursor{
-    private ListContainer list;
+public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener{
+    static private final Log log=LogFactory.getLog(FilePendingMessageCursor.class);
+    private Store store;
+    private String name;
+    private LinkedList memoryList=new LinkedList();
+    private ListContainer diskList;
     private Iterator iter=null;
     private Destination regionDestination;
+    private Lock iterLock=new ReentrantLock();
+    private Object mutex=new Object();
 
     /**
      * @param name
      * @param store
-     * @throws IOException
      */
     public FilePendingMessageCursor(String name,Store store){
-        try{
-            list=store.getListContainer(name);
-            list.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
-            list.setMaximumCacheSize(0);
-        }catch(IOException e){
-            throw new RuntimeException(e);
-        }
+        this.name=name;
+        this.store=store;
     }
 
     /**
      * @return true if there are no pending messages
      */
     public boolean isEmpty(){
-        return list.isEmpty();
+        synchronized(mutex){
+            return memoryList.isEmpty()&&isDiskListEmpty();
+        }
     }
 
     /**
@@ -59,7 +71,46 @@
      * 
      */
     public void reset(){
-        iter=list.listIterator();
+        iterLock.lock();
+        synchronized(mutex){
+            iter=isSpaceInMemoryList()?memoryList.iterator():diskList.listIterator();
+        }
+    }
+
+    public void release(){
+        iterLock.unlock();
+    }
+
+    public void destroy(){
+        for(Iterator i=memoryList.iterator();i.hasNext();){
+            Message node=(Message)i.next();
+            node.decrementReferenceCount();
+        }
+        memoryList.clear();
+        if(!isDiskListEmpty()){
+            getDiskList().clear();
+        }
+    }
+
+    public LinkedList pageInList(int maxItems){
+        LinkedList result=new LinkedList();
+        synchronized(mutex){
+            int count=0;
+            for(Iterator i=memoryList.iterator();i.hasNext()&&count<maxItems;){
+                result.add(i.next());
+                count++;
+            }
+            if(count<maxItems&&!isDiskListEmpty()){
+                for(Iterator i=getDiskList().iterator();i.hasNext()&&count<maxItems;){
+                    Message message=(Message)i.next();
+                    message.setRegionDestination(regionDestination);
+                    message.incrementReferenceCount();
+                    result.add(message);
+                    count++;
+                }
+            }
+        }
+        return result;
     }
 
     /**
@@ -68,46 +119,66 @@
      * @param node
      */
     public void addMessageLast(MessageReference node){
-        try{
-            regionDestination=node.getMessage().getRegionDestination();
-            node.decrementReferenceCount();
-        }catch(IOException e){
-            throw new RuntimeException(e);
+        synchronized(mutex){
+            try{
+                regionDestination=node.getMessage().getRegionDestination();
+                if(isSpaceInMemoryList()){
+                    memoryList.add(node);
+                }else{
+                    flushToDisk();
+                    node.decrementReferenceCount();
+                    getDiskList().addLast(node);
+                }
+            }catch(IOException e){
+                throw new RuntimeException(e);
+            }
         }
-        list.addLast(node);
     }
 
     /**
      * add message to await dispatch
      * 
-     * @param position
      * @param node
      */
     public void addMessageFirst(MessageReference node){
-        try{
-            regionDestination=node.getMessage().getRegionDestination();
-            node.decrementReferenceCount();
-        }catch(IOException e){
-            throw new RuntimeException(e);
+        synchronized(mutex){
+            try{
+                regionDestination=node.getMessage().getRegionDestination();
+                if(isSpaceInMemoryList()){
+                    memoryList.addFirst(node);
+                }else{
+                    flushToDisk();
+                    node.decrementReferenceCount();
+                    getDiskList().addFirst(node);
+                }
+            }catch(IOException e){
+                throw new RuntimeException(e);
+            }
         }
-        list.addFirst(node);
     }
 
     /**
      * @return true if there pending messages to dispatch
      */
     public boolean hasNext(){
-        return iter.hasNext();
+        synchronized(mutex){
+            return iter.hasNext();
+        }
     }
 
     /**
      * @return the next pending message
      */
     public MessageReference next(){
-        Message message=(Message) iter.next();
-        message.setRegionDestination(regionDestination);
-        message.incrementReferenceCount();
-        return message;
+        synchronized(mutex){
+            Message message=(Message)iter.next();
+            if(!isDiskListEmpty()){
+                // got from disk
+                message.setRegionDestination(regionDestination);
+                message.incrementReferenceCount();
+            }
+            return message;
+        }
     }
 
     /**
@@ -115,17 +186,31 @@
      * 
      */
     public void remove(){
-        iter.remove();
+        synchronized(mutex){
+            iter.remove();
+        }
     }
-    
+
+    /**
+     * @param node
+     * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
+     */
     public void remove(MessageReference node){
-        list.remove(node);
+        synchronized(mutex){
+            memoryList.remove(node);
+            if(!isDiskListEmpty()){
+                getDiskList().remove(node);
+            }
+        }
     }
+
     /**
      * @return the number of pending messages
      */
     public int size(){
-        return list.size();
+        synchronized(mutex){
+            return memoryList.size()+(isDiskListEmpty()?0:getDiskList().size());
+        }
     }
 
     /**
@@ -133,6 +218,66 @@
      * 
      */
     public void clear(){
-        list.clear();
+        synchronized(mutex){
+            memoryList.clear();
+            if(!isDiskListEmpty()){
+                getDiskList().clear();
+            }
+        }
+    }
+
+    public boolean isFull(){
+        // we always have space - as we can persist to disk
+        return false;
+    }
+
+    public void setUsageManager(UsageManager usageManager){
+        super.setUsageManager(usageManager);
+        usageManager.addUsageListener(this);
+    }
+
+    public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
+        if(newPercentUsage>=100){
+            try{
+                if(iterLock.tryLock(500,TimeUnit.MILLISECONDS)){
+                    flushToDisk();
+                    iterLock.unlock();
+                }
+            }catch(InterruptedException e){
+                log.warn("caught an exception aquiring lock",e);
+            }
+        }
+    }
+
+    protected boolean isSpaceInMemoryList(){
+        return hasSpace()&&isDiskListEmpty();
+    }
+
+    protected void flushToDisk(){
+        synchronized(mutex){
+            for(Iterator i=memoryList.iterator();i.hasNext();){
+                MessageReference node=(MessageReference)i.next();
+                node.decrementReferenceCount();
+                getDiskList().addLast(node);
+            }
+            memoryList.clear();
+        }
+    }
+
+    protected boolean isDiskListEmpty(){
+        return diskList==null||diskList.isEmpty();
+    }
+
+    protected ListContainer getDiskList(){
+        if(diskList==null){
+            try{
+                diskList=store.getListContainer(name);
+                diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
+                diskList.setMaximumCacheSize(0);
+            }catch(IOException e){
+                throw new RuntimeException(e);
+            }
+        }
+        return diskList;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=479094&r1=479093&r2=479094
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Fri Nov 24 23:10:17 2006
@@ -140,4 +140,9 @@
      * @see org.apache.activemq.memory.UsageManager
      */
     public void setUsageManager(UsageManager usageManager);
+    
+    /**
+     * @return true if the cursor is full
+     */
+    public boolean isFull();
 }



Mime
View raw message