activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r479156 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: ./ cursors/
Date Sat, 25 Nov 2006 17:58:42 GMT
Author: rajdavies
Date: Sat Nov 25 09:58:41 2006
New Revision: 479156

URL: http://svn.apache.org/viewvc?view=rev&rev=479156
Log:
Tidied up locking around cursor iterators

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Sat Nov 25 09:58:41 2006
@@ -118,15 +118,19 @@
             }
         }
         
-        if( !keepDurableSubsActive ) {
-        	synchronized(pending) {
-                pending.reset();
-	            while(pending.hasNext()) {
-	                MessageReference node = pending.next();
-	                node.decrementReferenceCount();
-	                pending.remove();
-	            }
-        	}
+        if(!keepDurableSubsActive){
+            synchronized(pending){
+                try{
+                    pending.reset();
+                    while(pending.hasNext()){
+                        MessageReference node=pending.next();
+                        node.decrementReferenceCount();
+                        pending.remove();
+                    }
+                }finally{
+                    pending.release();
+                }
+            }
         }
         prefetchExtension=0;
     }
@@ -195,22 +199,24 @@
     /**
      * Release any references that we are holding.
      */
-    public void destroy() {
-    	synchronized(pending) {
-            pending.reset();
-	        while(pending.hasNext()) {
-	            MessageReference node = pending.next();
-	            node.decrementReferenceCount();
-	        }
-	        pending.clear();
-    	}
-    	
-        for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
-            MessageReference node = (MessageReference) iter.next();
+    public void destroy(){
+        try{
+            synchronized(pending){
+                pending.reset();
+                while(pending.hasNext()){
+                    MessageReference node=pending.next();
+                    node.decrementReferenceCount();
+                }
+            }
+        }finally{
+            pending.release();
+            pending.clear();
+        }
+        for(Iterator iter=dispatched.iterator();iter.hasNext();){
+            MessageReference node=(MessageReference)iter.next();
             node.decrementReferenceCount();
         }
         dispatched.clear();
-        
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Sat Nov 25 09:58:41 2006
@@ -123,6 +123,7 @@
 	}
         
     public void add(MessageReference node) throws Exception{
+        try {
         boolean pendingEmpty = false;
         synchronized(pending){
             pendingEmpty=pending.isEmpty();
@@ -139,21 +140,30 @@
                 pending.addMessageLast(node);
             }
         }
+        }catch(Throwable e) {
+            e.printStackTrace();
+            
+        }
     }
 
-    public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws
Exception {
+    public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws
Exception{
         synchronized(pending){
-            pending.reset();
-            while(pending.hasNext()){
-                MessageReference node=pending.next();
-                if(node.getMessageId().equals(mdn.getMessageId())){
-                    pending.remove();
-                    createMessageDispatch(node,node.getMessage());
-                    dispatched.addLast(node);
-                    return;
+            try{
+                pending.reset();
+                while(pending.hasNext()){
+                    MessageReference node=pending.next();
+                    if(node.getMessageId().equals(mdn.getMessageId())){
+                        pending.remove();
+                        createMessageDispatch(node,node.getMessage());
+                        dispatched.addLast(node);
+                        return;
+                    }
                 }
+            }finally{
+                pending.release();
             }
-            throw new JMSException("Slave broker out of sync with master: Dispatched message
("+mdn.getMessageId()+") was not in the pending list: "+pending);
+            throw new JMSException("Slave broker out of sync with master: Dispatched message
("+mdn.getMessageId()
+                    +") was not in the pending list: "+pending);
         }
     }
 
@@ -387,6 +397,7 @@
                         dispatch(node);
                     }
                 }finally{
+                    pending.release();
                     dispatching=false;
                 }
             }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Sat Nov 25 09:58:41 2006
@@ -545,54 +545,58 @@
                 }
             }
         }
-        synchronized (messages) {
-            messages.reset();
-            while(messages.hasNext()) {
-                try {
-                    MessageReference r = messages.next();
-                    r.incrementReferenceCount();
-                    try {
-                        Message m = r.getMessage();
-                        if (m != null) {
-                            l.add(m);
+        synchronized(messages){
+            try{
+                messages.reset();
+                while(messages.hasNext()){
+                    try{
+                        MessageReference r=messages.next();
+                        r.incrementReferenceCount();
+                        try{
+                            Message m=r.getMessage();
+                            if(m!=null){
+                                l.add(m);
+                            }
+                        }finally{
+                            r.decrementReferenceCount();
                         }
+                    }catch(IOException e){
+                        log.error("caught an exception brwsing "+this,e);
                     }
-                    finally {
-                        r.decrementReferenceCount();
-                    }
-                }
-                catch (IOException e) {
-                    log.error("caught an exception brwsing " + this,e);
                 }
+            }finally{
+                messages.release();
             }
         }
 
         return (Message[]) l.toArray(new Message[l.size()]);
     }
 
-    public Message getMessage(String messageId) {
-        synchronized (messages) {
-            messages.reset();
-            while(messages.hasNext()) {
-                try {
-                    MessageReference r = messages.next();
-                    if (messageId.equals(r.getMessageId().toString())) {
-                        r.incrementReferenceCount();
-                        try {
-                            Message m = r.getMessage();
-                            if (m != null) {
-                                return m;
+    public Message getMessage(String messageId){
+        synchronized(messages){
+            try{
+                messages.reset();
+                while(messages.hasNext()){
+                    try{
+                        MessageReference r=messages.next();
+                        if(messageId.equals(r.getMessageId().toString())){
+                            r.incrementReferenceCount();
+                            try{
+                                Message m=r.getMessage();
+                                if(m!=null){
+                                    return m;
+                                }
+                            }finally{
+                                r.decrementReferenceCount();
                             }
+                            break;
                         }
-                        finally {
-                            r.decrementReferenceCount();
-                        }
-                        break;
+                    }catch(IOException e){
+                        log.error("got an exception retrieving message "+messageId);
                     }
                 }
-                catch (IOException e) {
-                    log.error("got an exception retrieving message " + messageId);
-                }
+            }finally{
+                messages.release();
             }
         }
         return null;
@@ -868,13 +872,17 @@
                 int count=0;
                 result=new ArrayList(toPageIn);
                 synchronized(messages){
-                    messages.reset();
-                    while(messages.hasNext()&&count<toPageIn){
-                        MessageReference node=messages.next();
-                        messages.remove();
-                        node=createMessageReference(node.getMessage());
-                        result.add(node);
-                        count++;
+                    try{
+                        messages.reset();
+                        while(messages.hasNext()&&count<toPageIn){
+                            MessageReference node=messages.next();
+                            messages.remove();
+                            node=createMessageReference(node.getMessage());
+                            result.add(node);
+                            count++;
+                        }
+                    }finally{
+                        messages.release();
                     }
                 }
                 synchronized(pagedInMessages){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Sat Nov 25 09:58:41 2006
@@ -135,36 +135,42 @@
      * Discard any expired messages from the matched list. Called from a synchronized block.
      * @throws IOException 
      */
-    protected void removeExpiredMessages() throws IOException {
-        matched.reset();
-        while(matched.hasNext()) {
-            MessageReference node=matched.next();
-            if (node.isExpired()) {
-                matched.remove();
-                dispatched.incrementAndGet();
-                node.decrementReferenceCount();
-                break;
-            }
-        }
-        matched.release();
-    }
-
-    public void processMessageDispatchNotification(MessageDispatchNotification mdn){
-        synchronized(matchedListMutex){
+    protected void removeExpiredMessages() throws IOException{
+        try{
             matched.reset();
-            while(matched.hasNext()) {
+            while(matched.hasNext()){
                 MessageReference node=matched.next();
-                if(node.getMessageId().equals(mdn.getMessageId())){
+                if(node.isExpired()){
                     matched.remove();
                     dispatched.incrementAndGet();
                     node.decrementReferenceCount();
                     break;
                 }
             }
+        }finally{
             matched.release();
         }
     }
 
+    public void processMessageDispatchNotification(MessageDispatchNotification mdn){
+        synchronized(matchedListMutex){
+            try{
+                matched.reset();
+                while(matched.hasNext()){
+                    MessageReference node=matched.next();
+                    if(node.getMessageId().equals(mdn.getMessageId())){
+                        matched.remove();
+                        dispatched.incrementAndGet();
+                        node.decrementReferenceCount();
+                        break;
+                    }
+                }
+            }finally{
+                matched.release();
+            }
+        }
+    }
+
     synchronized public void acknowledge(final ConnectionContext context,final MessageAck
ack) throws Exception{
         
         // Handle the standard acknowledgment case.
@@ -335,21 +341,22 @@
 
     private void dispatchMatched() throws IOException{
         synchronized(matchedListMutex){
-            matched.reset();
-            while(matched.hasNext()) {
-                MessageReference message=(MessageReference) matched.next();
-                matched.remove();
-                
-                // Message may have been sitting in the matched list a while
-                // waiting for the consumer to ak the message.
-        		if( message.isExpired() ) {
-        			message.decrementReferenceCount();
-        			continue; // just drop it.
-        		}
-
-                dispatch(message);
+            try{
+                matched.reset();
+                while(matched.hasNext()){
+                    MessageReference message=(MessageReference)matched.next();
+                    matched.remove();
+                    // Message may have been sitting in the matched list a while
+                    // waiting for the consumer to ak the message.
+                    if(message.isExpired()){
+                        message.decrementReferenceCount();
+                        continue; // just drop it.
+                    }
+                    dispatch(message);
+                }
+            }finally{
+                matched.release();
             }
-            matched.release();
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Sat Nov 25 09:58:41 2006
@@ -110,4 +110,8 @@
     public boolean isFull() {
         return usageManager != null ? usageManager.isFull() : false;
     }
+
+    
+    public void release(){        
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Sat Nov 25 09:58:41 2006
@@ -18,7 +18,6 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -38,6 +37,7 @@
  * @version $Revision$
  */
 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener{
+
     static private final Log log=LogFactory.getLog(FilePendingMessageCursor.class);
     private Store store;
     private String name;
@@ -45,8 +45,7 @@
     private ListContainer diskList;
     private Iterator iter=null;
     private Destination regionDestination;
-    private Lock iterLock=new ReentrantLock();
-    private Object mutex=new Object();
+    private ReentrantLock iterLock=new ReentrantLock();
 
     /**
      * @param name
@@ -60,10 +59,8 @@
     /**
      * @return true if there are no pending messages
      */
-    public boolean isEmpty(){
-        synchronized(mutex){
-            return memoryList.isEmpty()&&isDiskListEmpty();
-        }
+    public synchronized boolean isEmpty(){
+        return memoryList.isEmpty()&&isDiskListEmpty();
     }
 
     /**
@@ -71,9 +68,11 @@
      * 
      */
     public void reset(){
-        iterLock.lock();
-        synchronized(mutex){
-            iter=isSpaceInMemoryList()?memoryList.iterator():diskList.listIterator();
+        try{
+            iterLock.lockInterruptibly();
+            iter=isDiskListEmpty()?memoryList.iterator():getDiskList().listIterator();
+        }catch(InterruptedException e){
+            log.warn("Failed to get lock ",e);
         }
     }
 
@@ -81,7 +80,7 @@
         iterLock.unlock();
     }
 
-    public void destroy(){
+    public synchronized void destroy(){
         for(Iterator i=memoryList.iterator();i.hasNext();){
             Message node=(Message)i.next();
             node.decrementReferenceCount();
@@ -92,23 +91,21 @@
         }
     }
 
-    public LinkedList pageInList(int maxItems){
+    public synchronized LinkedList pageInList(int maxItems){
         LinkedList result=new LinkedList();
-        synchronized(mutex){
-            int count=0;
-            for(Iterator i=memoryList.iterator();i.hasNext()&&count<maxItems;){
-                result.add(i.next());
+        int count=0;
+        for(Iterator i=memoryList.iterator();i.hasNext()&&count<maxItems;){
+            result.add(i.next());
+            count++;
+        }
+        if(count<maxItems&&!isDiskListEmpty()){
+            for(Iterator i=getDiskList().iterator();i.hasNext()&&count<maxItems;){
+                Message message=(Message)i.next();
+                message.setRegionDestination(regionDestination);
+                message.incrementReferenceCount();
+                result.add(message);
                 count++;
             }
-            if(count<maxItems&&!isDiskListEmpty()){
-                for(Iterator i=getDiskList().iterator();i.hasNext()&&count<maxItems;){
-                    Message message=(Message)i.next();
-                    message.setRegionDestination(regionDestination);
-                    message.incrementReferenceCount();
-                    result.add(message);
-                    count++;
-                }
-            }
         }
         return result;
     }
@@ -118,20 +115,18 @@
      * 
      * @param node
      */
-    public void addMessageLast(MessageReference node){
-        synchronized(mutex){
-            try{
-                regionDestination=node.getMessage().getRegionDestination();
-                if(isSpaceInMemoryList()){
-                    memoryList.add(node);
-                }else{
-                    flushToDisk();
-                    node.decrementReferenceCount();
-                    getDiskList().addLast(node);
-                }
-            }catch(IOException e){
-                throw new RuntimeException(e);
+    public synchronized void addMessageLast(MessageReference node){
+        try{
+            regionDestination=node.getMessage().getRegionDestination();
+            if(isSpaceInMemoryList()){
+                memoryList.add(node);
+            }else{
+                flushToDisk();
+                node.decrementReferenceCount();
+                getDiskList().addLast(node);
             }
+        }catch(IOException e){
+            throw new RuntimeException(e);
         }
     }
 
@@ -140,93 +135,79 @@
      * 
      * @param node
      */
-    public void addMessageFirst(MessageReference node){
-        synchronized(mutex){
-            try{
-                regionDestination=node.getMessage().getRegionDestination();
-                if(isSpaceInMemoryList()){
-                    memoryList.addFirst(node);
-                }else{
-                    flushToDisk();
-                    node.decrementReferenceCount();
-                    getDiskList().addFirst(node);
-                }
-            }catch(IOException e){
-                throw new RuntimeException(e);
+    public synchronized void addMessageFirst(MessageReference node){
+        try{
+            regionDestination=node.getMessage().getRegionDestination();
+            if(isSpaceInMemoryList()){
+                memoryList.addFirst(node);
+            }else{
+                flushToDisk();
+                node.decrementReferenceCount();
+                getDiskList().addFirst(node);
             }
+        }catch(IOException e){
+            throw new RuntimeException(e);
         }
     }
 
     /**
      * @return true if there pending messages to dispatch
      */
-    public boolean hasNext(){
-        synchronized(mutex){
-            return iter.hasNext();
-        }
+    public synchronized boolean hasNext(){
+        return iter.hasNext();
     }
 
     /**
      * @return the next pending message
      */
-    public MessageReference next(){
-        synchronized(mutex){
-            Message message=(Message)iter.next();
-            if(!isDiskListEmpty()){
-                // got from disk
-                message.setRegionDestination(regionDestination);
-                message.incrementReferenceCount();
-            }
-            return message;
+    public synchronized MessageReference next(){
+        Message message=(Message)iter.next();
+        if(!isDiskListEmpty()){
+            // got from disk
+            message.setRegionDestination(regionDestination);
+            message.incrementReferenceCount();
         }
+        return message;
     }
 
     /**
      * remove the message at the cursor position
      * 
      */
-    public void remove(){
-        synchronized(mutex){
-            iter.remove();
-        }
+    public synchronized void remove(){
+        iter.remove();
     }
 
     /**
      * @param node
      * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
      */
-    public void remove(MessageReference node){
-        synchronized(mutex){
-            memoryList.remove(node);
-            if(!isDiskListEmpty()){
-                getDiskList().remove(node);
-            }
+    public synchronized void remove(MessageReference node){
+        memoryList.remove(node);
+        if(!isDiskListEmpty()){
+            getDiskList().remove(node);
         }
     }
 
     /**
      * @return the number of pending messages
      */
-    public int size(){
-        synchronized(mutex){
-            return memoryList.size()+(isDiskListEmpty()?0:getDiskList().size());
-        }
+    public synchronized int size(){
+        return memoryList.size()+(isDiskListEmpty()?0:getDiskList().size());
     }
 
     /**
      * clear all pending messages
      * 
      */
-    public void clear(){
-        synchronized(mutex){
-            memoryList.clear();
-            if(!isDiskListEmpty()){
-                getDiskList().clear();
-            }
+    public synchronized void clear(){
+        memoryList.clear();
+        if(!isDiskListEmpty()){
+            getDiskList().clear();
         }
     }
 
-    public boolean isFull(){
+    public synchronized boolean isFull(){
         // we always have space - as we can persist to disk
         return false;
     }
@@ -253,15 +234,13 @@
         return hasSpace()&&isDiskListEmpty();
     }
 
-    protected void flushToDisk(){
-        synchronized(mutex){
-            for(Iterator i=memoryList.iterator();i.hasNext();){
-                MessageReference node=(MessageReference)i.next();
-                node.decrementReferenceCount();
-                getDiskList().addLast(node);
-            }
-            memoryList.clear();
+    protected synchronized void flushToDisk(){
+        for(Iterator i=memoryList.iterator();i.hasNext();){
+            MessageReference node=(MessageReference)i.next();
+            node.decrementReferenceCount();
+            getDiskList().addLast(node);
         }
+        memoryList.clear();
     }
 
     protected boolean isDiskListEmpty(){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Sat Nov 25 09:58:41 2006
@@ -54,6 +54,13 @@
      *
      */
     public void reset();
+    
+    /**
+     * hint to the cursor to release any locks it might have
+     * grabbed after a reset
+     *
+     */
+    public void release();
 
     /**
      * add message to await dispatch

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Sat Nov 25 09:58:41 2006
@@ -185,10 +185,16 @@
     }
 
     public synchronized void reset(){
-        nonPersistent.reset();
         for(Iterator i=storePrefetches.iterator();i.hasNext();){
             AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
             tsp.reset();
+        }
+    }
+    
+    public synchronized void release(){
+        for(Iterator i=storePrefetches.iterator();i.hasNext();){
+            AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
+            tsp.release();
         }
     }
 



Mime
View raw message