activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r490796 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/
Date Thu, 28 Dec 2006 20:52:42 GMT
Author: rajdavies
Date: Thu Dec 28 12:52:41 2006
New Revision: 490796

URL: http://svn.apache.org/viewvc?view=rev&rev=490796
Log:
Added support for hasMessagesToDeliver() method

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=490796&r1=490795&r2=490796
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Thu Dec 28 12:52:41 2006
@@ -122,4 +122,8 @@
     
     public void release(){        
     }
+    
+    public boolean hasMessagesBufferedToDeliver() {
+        return false;
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=490796&r1=490795&r2=490796
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Thu Dec 28 12:52:41 2006
@@ -213,6 +213,10 @@
         // we always have space - as we can persist to disk
         return false;
     }
+    
+    public boolean hasMessagesBufferedToDeliver() {
+        return !isEmpty();
+    }
 
     public void setUsageManager(UsageManager usageManager){
         super.setUsageManager(usageManager);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=490796&r1=490795&r2=490796
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Thu Dec 28 12:52:41 2006
@@ -166,4 +166,9 @@
      * @return true if the cursor is full
      */
     public boolean isFull();
+    
+    /**
+     * @return true if the cursor has buffered messages ready to deliver
+     */
+    public boolean hasMessagesBufferedToDeliver();
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?view=diff&rev=490796&r1=490795&r2=490796
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Thu Dec 28 12:52:41 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.commons.logging.Log;
@@ -43,6 +44,7 @@
     private MessageStore store;
     private final LinkedList batchList=new LinkedList();
     private Destination regionDestination;
+    private int size = 0;
 
     /**
      * @param topic
@@ -68,26 +70,48 @@
      * @return true if there are no pending messages
      */
     public boolean isEmpty(){
-        return batchList.isEmpty();
+        return size <= 0;
+    }
+    
+    public boolean hasMessagesBufferedToDeliver() {
+        return !batchList.isEmpty();
     }
     
     public synchronized int size(){
         try {
-        return store.getMessageCount();
+        size =  store.getMessageCount();
         }catch(IOException e) {
             log.error("Failed to get message count",e);
             throw new RuntimeException(e);
         }
+        return size;
     }
     
     public synchronized void addMessageLast(MessageReference node) throws Exception{
         if(node!=null){
             node.decrementReferenceCount();
         }
+        size++;
+    }
+    
+    public void addMessageFirst(MessageReference node) throws Exception{
+        if(node!=null){
+            node.decrementReferenceCount();
+        }
+        size++;
     }
+    
+    public void remove(){
+        size--;
+    }
+
+    public void remove(MessageReference node){
+        size--;
+    }
+
 
     public synchronized boolean hasNext(){
-        if(isEmpty()){
+        if(batchList.isEmpty()){
             try{
                 fillBatch();
             }catch(Exception e){
@@ -95,7 +119,7 @@
                 throw new RuntimeException(e);
             }
         }
-        return !isEmpty();
+        return !batchList.isEmpty();
     }
 
     public synchronized MessageReference next(){
@@ -117,10 +141,15 @@
         batchList.addLast(message);
     }
 
-    public void recoverMessageReference(String messageReference)
-            throws Exception{
-        // shouldn't get called
-        throw new RuntimeException("Not supported");
+    public void recoverMessageReference(String messageReference) throws Exception{
+        Message msg=store.getMessage(new MessageId(messageReference));
+        if(msg!=null){
+            recoverMessage(msg);
+        }else{
+            String err = "Failed to retrieve message for id: "+messageReference;
+            log.error(err);
+            throw new IOException(err);
+        }
     }
     
     public void gc() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?view=diff&rev=490796&r1=490795&r2=490796
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Thu Dec 28 12:52:41 2006
@@ -37,6 +37,7 @@
     private QueueStorePrefetch persistent;
     private boolean started;
     private PendingMessageCursor currentCursor;
+    
    
     /**
      * Construct
@@ -48,6 +49,7 @@
         this.queue=queue;
         this.tmpStore=tmpStore;
         this.persistent=new QueueStorePrefetch(queue);
+        currentCursor = persistent;
     }
 
     public synchronized void start() throws Exception{
@@ -134,7 +136,7 @@
         pendingCount--;
     }
 
-    public void remove(MessageReference node){
+    public synchronized void remove(MessageReference node){
         if (!node.isPersistent()) {
             nonPersistent.remove(node);
         }else {
@@ -145,6 +147,7 @@
 
     public synchronized void reset(){
         nonPersistent.reset();
+        persistent.reset();
     }
 
     public int size(){
@@ -208,8 +211,12 @@
      }
 
     protected synchronized PendingMessageCursor getNextCursor() throws Exception{
-        if(currentCursor==null||currentCursor.isEmpty()){
+        if(currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()){
             currentCursor=currentCursor==persistent?nonPersistent:persistent;
+            //sanity check
+            if (currentCursor.isEmpty()) {
+                currentCursor=currentCursor==persistent?nonPersistent:persistent;
+            }
         }
         return currentCursor;
     }



Mime
View raw message