activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r479614 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/ broker/region/cursors/ broker/region/policy/ memory/list/ store/kahadaptor/ store/memory/ store/rapid/
Date Mon, 27 Nov 2006 13:40:13 GMT
Author: rajdavies
Date: Mon Nov 27 05:40:11 2006
New Revision: 479614

URL: http://svn.apache.org/viewvc?view=rev&rev=479614
Log:
support for durable store cursors and retroactive subscribers

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Mon Nov 27 05:40:11 2006
@@ -159,4 +159,29 @@
     public boolean isRecoveryRequired(){
         return true;
     }
+    
+    public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception{
+        boolean result = false;
+        MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
+        try {
+            msgContext.setDestination(message.getRegionDestination().getActiveMQDestination());
+            msgContext.setMessageReference(message);
+            result = matches(message,msgContext);
+            if (result) {
+                doAddRecoveredMessage(message);
+            }
+            
+        }finally {
+            msgContext.clear();
+        }
+        return result;
+    }
+    
+    public  ActiveMQDestination getActiveMQDestination() {
+        return info != null ? info.getDestination() : null;
+    }
+    
+    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
+        add(message);
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Nov 27 05:40:11 2006
@@ -152,6 +152,10 @@
         super.add(node);
     }
     
+    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
+        pending.addRecoveredMessage(message);
+    }
+    
     public int getPendingQueueSize() {
         if( active || keepDurableSubsActive ) {
             return super.getPendingQueueSize();
@@ -218,5 +222,7 @@
         }
         dispatched.clear();
     }
+    
+    
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Nov 27 05:40:11 2006
@@ -390,7 +390,7 @@
                         
                         // Message may have been sitting in the pending list a while
                         // waiting for the consumer to ak the message.
-                		if( node.isExpired() ) {
+                		if( node != QueueMessageReference.NULL_MESSAGE && node.isExpired() ) {
                 			continue; // just drop it.
                 		}
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Mon Nov 27 05:40:11 2006
@@ -34,7 +34,7 @@
 /**
  * @version $Revision: 1.5 $
  */
-public interface Subscription {
+public interface Subscription extends SubscriptionRecovery {
 
     /**
      * Used to add messages that match the subscription.

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java?view=auto&rev=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java Mon Nov 27 05:40:11 2006
@@ -0,0 +1,49 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * An interface for recoverying transient messages held by the broker
+ * for retractive recovery for subscribers
+ * 
+ * @version $Revision$
+ */
+public interface SubscriptionRecovery  {
+    
+    
+    /**
+     * Add a message to the SubscriptionRecovery
+     * @param context
+     * @param message
+     * @return true if the message is accepted
+     * @throws Exception
+     */
+    boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception;
+    
+    
+    /**
+     * @return the Destination associated with this Subscription
+     */
+    ActiveMQDestination getActiveMQDestination();
+    
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Mon Nov 27 05:40:11 2006
@@ -51,6 +51,10 @@
 
     public void addMessageLast(MessageReference node) throws Exception{
     }
+    
+    public void addRecoveredMessage(MessageReference node) throws Exception{
+        addMessageLast(node);
+    }
 
     public void clear(){
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Mon Nov 27 05:40:11 2006
@@ -17,8 +17,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.Message;
@@ -45,7 +44,8 @@
     private ListContainer diskList;
     private Iterator iter=null;
     private Destination regionDestination;
-    private ReentrantLock iterLock=new ReentrantLock();
+    private AtomicBoolean iterating=new AtomicBoolean();
+    private boolean flushRequired;
 
     /**
      * @param name
@@ -67,17 +67,19 @@
      * reset the cursor
      * 
      */
-    public void reset(){
-        try{
-            iterLock.lockInterruptibly();
-            iter=isDiskListEmpty()?memoryList.iterator():getDiskList().listIterator();
-        }catch(InterruptedException e){
-            log.warn("Failed to get lock ",e);
-        }
+    public synchronized void reset(){
+        synchronized(iterating){
+            iterating.set(true);
+        }
+        iter=isDiskListEmpty()?memoryList.iterator():getDiskList().listIterator();
     }
 
-    public void release(){
-        iterLock.unlock();
+    public synchronized void release(){
+        iterating.set(false);
+        if(flushRequired){
+            flushRequired=false;
+            flushToDisk();
+        }
     }
 
     public synchronized void destroy(){
@@ -219,13 +221,12 @@
 
     public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
         if(newPercentUsage>=100){
-            try{
-                if(iterLock.tryLock(500,TimeUnit.MILLISECONDS)){
+            synchronized(iterating){
+                flushRequired=true;
+                if(!iterating.get()){
                     flushToDisk();
-                    iterLock.unlock();
+                    flushRequired=false;
                 }
-            }catch(InterruptedException e){
-                log.warn("caught an exception aquiring lock",e);
             }
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Mon Nov 27 05:40:11 2006
@@ -76,6 +76,13 @@
      * @throws Exception 
      */
     public void addMessageFirst(MessageReference node) throws Exception;
+    
+    /**
+     * Add a message recovered from a retroactive policy
+     * @param node
+     * @throws Exception
+     */
+    public void addRecoveredMessage(MessageReference node) throws Exception;
 
     /**
      * @return true if there pending messages to dispatch

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Nov 27 05:40:11 2006
@@ -147,6 +147,10 @@
             }
         }
     }
+    
+    public void addRecoveredMessage(MessageReference node) throws Exception{
+        nonPersistent.addMessageLast(node);
+    }
 
     public void clear(){
         pendingCount=0;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java Mon Nov 27 05:40:11 2006
@@ -22,6 +22,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.SubscriptionRecovery;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -53,7 +54,7 @@
         return true;
     }
 
-    synchronized public void recover(ConnectionContext context,Topic topic,Subscription sub) throws Exception{
+    synchronized public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception{
         // Re-dispatch the last message seen.
         int t=tail;
         // The buffer may not have rolled over yet..., start from the front
@@ -63,18 +64,9 @@
         if(messages[t]==null)
             return;
         // Keep dispatching until t hit's tail again.
-        MessageEvaluationContext msgContext=context.getMessageEvaluationContext();
         do{
             MessageReference node=messages[t];
-            try{
-                msgContext.setDestination(node.getRegionDestination().getActiveMQDestination());
-                msgContext.setMessageReference(node);
-                if(sub.matches(node,msgContext)){
-                    sub.add(node);
-                }
-            }finally{
-                msgContext.clear();
-            }
+            sub.addRecoveredMessage(context,node);
             t++;
             if(t>=messages.length)
                 t=0;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java Mon Nov 27 05:40:11 2006
@@ -17,23 +17,18 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import java.util.Iterator;
+import java.util.List;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.SubscriptionRecovery;
 import org.apache.activemq.broker.region.Topic;
-import org.apache.activemq.broker.region.policy.TimedSubscriptionRecoveryPolicy.TimestampWrapper;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
-import org.apache.activemq.filter.DestinationFilter;
-import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.memory.list.DestinationBasedMessageList;
 import org.apache.activemq.memory.list.MessageList;
 import org.apache.activemq.memory.list.SimpleMessageList;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
 /**
  * This implementation of {@link SubscriptionRecoveryPolicy} will keep a fixed
  * amount of memory available in RAM for message history which is evicted in
@@ -61,22 +56,13 @@
         return true;
     }
 
-    public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception {
+    public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception{
         // Re-dispatch the messages from the buffer.
-        List copy = buffer.getMessages(sub);
-        if( !copy.isEmpty() ) {
-            MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
-            try {
-                for (Iterator iter = copy.iterator(); iter.hasNext();) {
-                    MessageReference node = (MessageReference) iter.next();
-                    msgContext.setDestination(node.getRegionDestination().getActiveMQDestination());
-                    msgContext.setMessageReference(node);
-                    if (sub.matches(node, msgContext) ) {
-                        sub.add(node);
-                    }
-                }
-            } finally {
-                msgContext.clear();
+        List copy=buffer.getMessages(sub.getActiveMQDestination());
+        if(!copy.isEmpty()){
+            for(Iterator iter=copy.iterator();iter.hasNext();){
+                MessageReference node=(MessageReference)iter.next();
+                sub.addRecoveredMessage(context,node);
             }
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java Mon Nov 27 05:40:11 2006
@@ -19,15 +19,13 @@
 
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.SubscriptionRecovery;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.DestinationFilter;
-import org.apache.activemq.filter.MessageEvaluationContext;
 
 /**
  * This implementation of {@link SubscriptionRecoveryPolicy} will only keep 
@@ -46,20 +44,11 @@
         return true;
     }
 
-    public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception {
+    public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception{
         // Re-dispatch the last message seen.
-        MessageReference node = lastImage;
-        if( node != null ){
-            MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
-            try {
-                msgContext.setDestination(node.getRegionDestination().getActiveMQDestination());
-                msgContext.setMessageReference(node);                        
-                if (sub.matches(node, msgContext)) {
-                    sub.add(node);
-                }
-            } finally {
-                msgContext.clear();
-            }
+        MessageReference node=lastImage;
+        if(node!=null){
+            sub.addRecoveredMessage(context,node);
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java Mon Nov 27 05:40:11 2006
@@ -20,6 +20,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.SubscriptionRecovery;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -43,7 +44,7 @@
         return true;
     }
 
-    public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception {
+    public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception {
     }
 
     public void start() throws Exception {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java Mon Nov 27 05:40:11 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.SubscriptionRecovery;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -66,20 +67,16 @@
         return query.validateUpdate(message.getMessage());
     }
 
-    public void recover(ConnectionContext context, final Topic topic, final Subscription sub) throws Exception {
-        if (query != null) {
-            final MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
-            try {
-                ActiveMQDestination destination = sub.getConsumerInfo().getDestination();
-                query.execute(destination, new MessageListener() {
-                    public void onMessage(Message message) {
-                        dispatchInitialMessage(message, topic, msgContext, sub);
-                    }
-                });
-            }
-            finally {
-                msgContext.clear();
-            }
+    public void recover(final ConnectionContext context,final Topic topic,final SubscriptionRecovery sub)
+            throws Exception{
+        if(query!=null){
+            ActiveMQDestination destination=sub.getActiveMQDestination();
+            query.execute(destination,new MessageListener(){
+
+                public void onMessage(Message message){
+                    dispatchInitialMessage(message,topic,context,sub);
+                }
+            });
         }
     }
 
@@ -107,21 +104,17 @@
         return new org.apache.activemq.command.Message[0];
     }
 
-    protected void dispatchInitialMessage(Message message,  Destination regionDestination, MessageEvaluationContext msgContext, Subscription sub) {
+    protected void dispatchInitialMessage(Message message,  Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) {
         try {
             ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message, null);
             ActiveMQDestination destination = activeMessage.getDestination();
             if (destination == null) {
-                destination = sub.getConsumerInfo().getDestination();
+                destination = sub.getActiveMQDestination();
                 activeMessage.setDestination(destination);
             }
             activeMessage.setRegionDestination(regionDestination);
             configure(activeMessage);
-            msgContext.setDestination(destination);
-            msgContext.setMessageReference(activeMessage);
-            if (sub.matches(activeMessage, msgContext)) {
-                sub.add(activeMessage);
-            }
+            sub.addRecoveredMessage(context,activeMessage);
         }
         catch (Throwable e) {
             log.warn("Failed to dispatch initial message: " + message + " into subscription. Reason: " + e, e);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java Mon Nov 27 05:40:11 2006
@@ -22,6 +22,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.SubscriptionRecovery;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -55,7 +56,7 @@
      * @param node
      * @throws Exception
      */
-    void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception;
+    void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception;
     
     
     /**
@@ -67,6 +68,7 @@
 
     /**
      * Used to copy the policy object.
+     * @return the copy
      */
     SubscriptionRecoveryPolicy copy();
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java Mon Nov 27 05:40:11 2006
@@ -22,10 +22,9 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.SubscriptionRecovery;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -80,25 +79,15 @@
         return true;
     }
 
-    public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception {
-        
+    public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception{
         // Re-dispatch the messages from the buffer.
-        ArrayList copy = new ArrayList(buffer);
-
-        if (!copy.isEmpty()) {
-            MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
-            try {
-                for (Iterator iter = copy.iterator(); iter.hasNext();) {
-                    TimestampWrapper timestampWrapper = (TimestampWrapper) iter.next();
-                    MessageReference message = timestampWrapper.message;
-                    msgContext.setDestination(message.getRegionDestination().getActiveMQDestination());
-                    msgContext.setMessageReference(message);
-                    if (sub.matches(message, msgContext)) {
-                        sub.add(timestampWrapper.message);
-                    }
-                }
-            }finally {
-                msgContext.clear();
+        ArrayList copy=new ArrayList(buffer);
+        if(!copy.isEmpty()){
+            MessageEvaluationContext msgContext=context.getMessageEvaluationContext();
+            for(Iterator iter=copy.iterator();iter.hasNext();){
+                TimestampWrapper timestampWrapper=(TimestampWrapper)iter.next();
+                MessageReference message=timestampWrapper.message;
+                sub.addRecoveredMessage(context,message);
             }
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java Mon Nov 27 05:40:11 2006
@@ -74,7 +74,7 @@
         return getMessages(sub.getConsumerInfo().getDestination());
     }
     
-    protected  List getMessages(ActiveMQDestination destination) {
+    public  List getMessages(ActiveMQDestination destination) {
         Set set = null;
         synchronized (lock) {
             set = subscriptionIndex.get(destination);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java Mon Nov 27 05:40:11 2006
@@ -17,13 +17,11 @@
  */
 package org.apache.activemq.memory.list;
 
+import java.util.List;
 import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 
-import java.util.List;
-
 /**
  * A container of messages which is used to store messages and then 
  * replay them later for a given subscription.
@@ -37,7 +35,7 @@
     /**
      * Returns the current list of MessageReference objects for the given subscription
      */
-    List getMessages(Subscription sub);
+    List getMessages(ActiveMQDestination destination);
     
     /**
      * @param destination

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java Mon Nov 27 05:40:11 2006
@@ -17,21 +17,18 @@
  */
 package org.apache.activemq.memory.list;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.DestinationFilter;
-import org.apache.activemq.network.DemandForwardingBridge;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
 /**
  * A simple fixed size {@link MessageList} where there is a single, fixed size
  * list that all messages are added to for simplicity. Though this
@@ -66,7 +63,7 @@
         }
     }
 
-    public List getMessages(Subscription sub) {
+    public List getMessages(ActiveMQDestination destination) {
         return getList();
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Mon Nov 27 05:40:11 2006
@@ -112,7 +112,16 @@
         if(!subscriberContainer.containsKey(key)){
             subscriberContainer.put(key,info);
         }
-        addSubscriberMessageContainer(key);
+        ListContainer container=addSubscriberMessageContainer(key);
+        if(retroactive){
+            for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
+                TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
+                ConsumerMessageRef ref=new ConsumerMessageRef();
+                ref.setAckEntry(entry);
+                ref.setMessageEntry(tsa.getMessageEntry());
+                container.add(ref);
+            }
+        }
     }
 
     public synchronized void deleteSubscription(String clientId,String subscriptionName){
@@ -207,12 +216,13 @@
         return result;
     }
 
-    protected void addSubscriberMessageContainer(Object key) throws IOException{
+    protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{
         ListContainer container=store.getListContainer(key,"topic-subs");
         Marshaller marshaller=new ConsumerMessageRefMarshaller();
         container.setMarshaller(marshaller);
         TopicSubContainer tsc=new TopicSubContainer(container);
         subscriberMessages.put(key,tsc);
+        return container;
     }
 
     public int getMessageCount(String clientId,String subscriberName) throws IOException{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Mon Nov 27 05:40:11 2006
@@ -19,6 +19,7 @@
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -37,6 +38,7 @@
 
     protected final ActiveMQDestination destination;
     protected final Map messageTable;
+    protected MessageId lastBatchId;
 
     public MemoryMessageStore(ActiveMQDestination destination){
         this(destination,new LinkedHashMap());
@@ -115,12 +117,32 @@
         return messageTable.size();
     }
 
-    public void resetBatching(MessageId nextToDispatch){
-    }
-
+    
     public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+        synchronized(messageTable){
+            
+            boolean pastLackBatch=lastBatchId==null;
+            int count = 0;
+            for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
+                Map.Entry entry=(Entry)iter.next();
+                if(pastLackBatch){
+                    count++;
+                    Object msg=entry.getValue();
+                    lastBatchId = (MessageId)entry.getKey();
+                    if(msg.getClass()==String.class){
+                        listener.recoverMessageReference((String)msg);
+                    }else{
+                        listener.recoverMessage((Message)msg);
+                    }
+                }else{
+                    pastLackBatch=entry.getKey().equals(lastBatchId);
+                }
+            }
+            listener.finished();
+        }
     }
 
     public void resetBatching(){
+        lastBatchId = null;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Mon Nov 27 05:40:11 2006
@@ -37,10 +37,11 @@
 
     private Map ackDatabase;
     private Map subscriberDatabase;
+    private Map batchDatabase;
     MessageId lastMessageId;
 
     public MemoryTopicMessageStore(ActiveMQDestination destination){
-        this(destination,new LinkedHashMap(),makeMap(),makeMap());
+        this(destination,new LinkedHashMap(),makeMap(),makeMap(),makeMap());
     }
 
     protected static Map makeMap(){
@@ -48,10 +49,11 @@
     }
 
     public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase,
-            Map ackDatabase){
+            Map ackDatabase, Map batchDatabase){
         super(destination,messageTable);
         this.subscriberDatabase=subscriberDatabase;
         this.ackDatabase=ackDatabase;
+        this.batchDatabase=batchDatabase;
     }
 
     public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
@@ -110,13 +112,10 @@
             }
             listener.finished();
         }
+       
     }
 
-    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
-            MessageRecoveryListener listener) throws Exception{
-        listener.finished();
-    }
-
+   
     public void delete(){
         super.delete();
         ackDatabase.clear();
@@ -128,14 +127,6 @@
         return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
     }
 
-    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
-        return null;
-    }
-
-    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id)
-            throws IOException{
-        return null;
-    }
 
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
         int result=0;
@@ -143,24 +134,56 @@
         // the message table is a synchronizedMap - so just have to synchronize here
         synchronized(messageTable){
             result=messageTable.size();
-            for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
-                Map.Entry entry=(Entry)iter.next();
-                if(entry.getKey().equals(lastAck)){
-                    break;
+            if(lastAck!=null){
+                for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
+                    Map.Entry entry=(Entry)iter.next();
+                    if(entry.getKey().equals(lastAck)){
+                        break;
+                    }
+                    result--;
                 }
-                result--;
             }
         }
         return result;
     }
 
-    public void resetBatching(String clientId,String subscriptionName,MessageId id){
-    }
-
+    
     public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
             MessageRecoveryListener listener) throws Exception{
+        SubscriptionKey key = new SubscriptionKey(clientId,subscriptionName);
+        MessageId lastBatch = (MessageId)batchDatabase.get(key);
+        if (lastBatch==null) {
+            //if last batch null - start from last ack
+            lastBatch = (MessageId)ackDatabase.get(key);
+        }
+        boolean pastLackBatch=lastBatch==null;
+        MessageId lastId = null;
+        // the message table is a synchronizedMap - so just have to synchronize here
+        int count = 0;
+        synchronized(messageTable){
+            for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext() &&count < maxReturned ;){
+                Map.Entry entry=(Entry)iter.next();
+                if(pastLackBatch){
+                    count++;
+                    Object msg=entry.getValue();
+                    lastId = (MessageId)entry.getKey();
+                    if(msg.getClass()==String.class){
+                        listener.recoverMessageReference((String)msg);
+                    }else{
+                        listener.recoverMessage((Message)msg);
+                    }
+                }else{
+                    pastLackBatch=entry.getKey().equals(lastBatch);
+                }
+            }
+            if (lastId != null) {
+                batchDatabase.put(key,lastId);
+            }
+            listener.finished();
+        }
     }
 
     public void resetBatching(String clientId,String subscriptionName){
+        batchDatabase.remove(new SubscriptionKey(clientId,subscriptionName));
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=479614&r1=479613&r2=479614
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Mon Nov 27 05:40:11 2006
@@ -120,7 +120,16 @@
         if(!subscriberContainer.containsKey(key)){
             subscriberContainer.put(key,info);
         }
-        addSubscriberMessageContainer(key);
+        ListContainer container=addSubscriberMessageContainer(key);
+        if(retroactive){
+            for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
+                TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
+                ConsumerMessageRef ref=new ConsumerMessageRef();
+                ref.setAckEntry(entry);
+                ref.setMessageEntry(tsa.getMessageEntry());
+                container.add(ref);
+            }
+        }
     }
 
     public synchronized void deleteSubscription(String clientId,String subscriptionName){
@@ -204,12 +213,13 @@
         return result;
     }
 
-    protected void addSubscriberMessageContainer(Object key) throws IOException{
+    protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{
         ListContainer container=store.getListContainer(key,"topic-subs");
         Marshaller marshaller=new ConsumerMessageRefMarshaller();
         container.setMarshaller(marshaller);
         TopicSubContainer tsc=new TopicSubContainer(container);
         subscriberMessages.put(key,tsc);
+        return container;
     }
 
     public int getMessageCount(String clientId,String subscriberName) throws IOException{



Mime
View raw message