activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r480731 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/jdbc/ main...
Date Wed, 29 Nov 2006 22:09:34 GMT
Author: rajdavies
Date: Wed Nov 29 14:09:33 2006
New Revision: 480731

URL: http://svn.apache.org/viewvc?view=rev&rev=480731
Log:
Setting the Store based cursor as the default for Durable Subscribers

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDurableSubscriptionTest.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=480731&r1=480730&r2=480731
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Wed Nov 29 14:09:33 2006
@@ -47,6 +47,7 @@
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
@@ -140,7 +141,8 @@
     private ActiveMQDestination[] destinations;
     private Store tempDataStore;
     private int persistenceThreadPriority = Thread.MAX_PRIORITY;
-    private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy =
new VMPendingDurableSubscriberMessageStoragePolicy();
+    //private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy
= new VMPendingDurableSubscriberMessageStoragePolicy();
+    private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy =
new StorePendingDurableSubscriberMessageStoragePolicy();
    
 
     /**
@@ -383,6 +385,7 @@
             startDestinations();
             
             addShutdownHook();
+            log.info("Using Persistence Adaptor " + getPersistenceAdapter());
             if (deleteAllMessagesOnStartup) {
                 deleteAllMessages();
             }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=480731&r1=480730&r2=480731
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Wed Nov 29 14:09:33 2006
@@ -64,6 +64,9 @@
         if( active || keepDurableSubsActive ) {
             Topic topic = (Topic) destination;            
             topic.activate(context, this);
+            if (pending.isEmpty(topic)) {
+                topic.recoverRetroactiveMessages(context, this);
+            }
         }
         dispatchMatched();
     }
@@ -81,6 +84,13 @@
             }
             synchronized(pending) {
                 pending.start();
+            }
+            //If nothing was in the persistent store, then try to use the recovery policy.
+            if (pending.isEmpty()) {
+                for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
+                    Topic topic = (Topic) iter.next();
+                    topic.recoverRetroactiveMessages(context, this);
+                }
             }
             dispatchMatched();
         }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=480731&r1=480730&r2=480731
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Wed Nov 29 14:09:33 2006
@@ -146,7 +146,6 @@
     }
     
     public void activate(ConnectionContext context, final DurableTopicSubscription subscription)
throws Exception {
-        
         // synchronize with dispatch method so that no new messages are sent
         // while
         // we are recovering a subscription to avoid out of order messages.
@@ -210,15 +209,7 @@
                 });
             }
             
-            if( true && subscription.getConsumerInfo().isRetroactive() ) {
-                // If nothing was in the persistent store, then try to use the recovery policy.
-                if( subscription.getEnqueueCounter() == 0 ) {
-                    subscriptionRecoveryPolicy.recover(context, this, subscription);
-                } else {
-                    // TODO: implement something like
-                    // subscriptionRecoveryPolicy.recoverNonPersistent(context, this, sub);
-                }
-            }
+            
         
         }
         finally {
@@ -231,7 +222,15 @@
             consumers.remove(sub);
         }
         sub.remove(context, this);
-    }    
+    } 
+    
+    
+    protected void recoverRetroactiveMessages(ConnectionContext context,Subscription subscription)
throws Exception{
+        if(subscription.getConsumerInfo().isRetroactive()){
+            subscriptionRecoveryPolicy.recover(context,this,subscription);
+        }
+    }
+    
 
 
     public void send(final ConnectionContext context, final Message message) throws Exception
{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=480731&r1=480730&r2=480731
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Wed Nov 29 14:09:33 2006
@@ -66,6 +66,10 @@
     public boolean isEmpty(){
         return false;
     }
+    
+    public boolean isEmpty(Destination destination) {
+        return isEmpty();
+    }
 
     public MessageReference next(){
         return null;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=480731&r1=480730&r2=480731
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Wed Nov 29 14:09:33 2006
@@ -50,6 +50,13 @@
     public boolean isEmpty();
     
     /**
+     * check if a Destination is Empty for this cursor
+     * @param destination
+     * @return true id the Destination is empty
+     */
+    public boolean isEmpty(Destination destination);
+    
+    /**
      * reset the cursor
      *
      */

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=480731&r1=480730&r2=480731
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Wed Nov 29 14:09:33 2006
@@ -19,6 +19,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -85,14 +86,16 @@
      * @throws Exception
      */
     public synchronized void add(ConnectionContext context,Destination destination) throws
Exception{
-        TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName);
-        tsp.setMaxBatchSize(getMaxBatchSize());
-        tsp.setUsageManager(usageManager);
-        topics.put(destination,tsp);
-        storePrefetches.add(tsp);
-        if(started){
-            tsp.start();
-            pendingCount+=tsp.size();
+        if(destination!=null&&!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())){
+            TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName);
+            tsp.setMaxBatchSize(getMaxBatchSize());
+            tsp.setUsageManager(usageManager);
+            topics.put(destination,tsp);
+            storePrefetches.add(tsp);
+            if(started){
+                tsp.start();
+                pendingCount+=tsp.size();
+            }
         }
     }
 
@@ -115,6 +118,15 @@
      */
     public synchronized boolean isEmpty(){
         return pendingCount<=0;
+    }
+    
+    public boolean isEmpty(Destination destination) {
+        boolean result = true;
+        TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(destination);
+        if(tsp!=null){
+            result = tsp.size() <= 0;
+        }
+        return result;
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=480731&r1=480730&r2=480731
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
Wed Nov 29 14:09:33 2006
@@ -88,4 +88,6 @@
     
     public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long
nextSeq,int maxReturned,
             JDBCMessageRecoveryListener listener) throws Exception;
+    
+    public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,ActiveMQDestination
destination,String clientId, String subscriberName) throws SQLException,IOException;
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=480731&r1=480730&r2=480731
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
Wed Nov 29 14:09:33 2006
@@ -99,7 +99,8 @@
         String subcriberId=getSubscriptionKey(clientId,subscriptionName);
         AtomicLong last=(AtomicLong)subscriberLastMessageMap.get(subcriberId);
         if(last==null){
-            last=new AtomicLong(-1);
+            long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c,destination,clientId,subscriptionName);
+            last=new AtomicLong(lastAcked);
             subscriberLastMessageMap.put(subcriberId,last);
         }
         final AtomicLong finalLast=last;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=480731&r1=480730&r2=480731
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Wed Nov 29 14:09:33 2006
@@ -64,7 +64,7 @@
     private String lockUpdateStatement;
     private String nextDurableSubscriberMessageStatement;
     private String durableSubscriberMessageCountStatement;
-    private String nextDurableSubscriberMessageIdStatement;
+    private String lastAckedDurableSubscriberMessageStatement;
     private String destinationMessageCountStatement;
     private String findNextMessagesStatement;
     private boolean useLockCreateWhereClause;
@@ -322,6 +322,18 @@
         }
         return findNextMessagesStatement;
     }
+    
+    /**
+     * @return the lastAckedDurableSubscriberMessageStatement
+     */
+    public String getLastAckedDurableSubscriberMessageStatement(){
+        if(lastAckedDurableSubscriberMessageStatement==null) {
+            lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID) FROM
" + getFullAckTableName()
+            + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+        }
+        return lastAckedDurableSubscriberMessageStatement;
+    }
+
 
 
     public String getFullMessageTableName() {
@@ -590,20 +602,7 @@
      */
     public void setDurableSubscriberMessageCountStatement(String durableSubscriberMessageCountStatement){
         this.durableSubscriberMessageCountStatement=durableSubscriberMessageCountStatement;
-    }
-
-    
-    
-
-    
-    /**
-     * @param nextDurableSubscriberMessageIdStatement the nextDurableSubscriberMessageIdStatement
to set
-     */
-    public void setNextDurableSubscriberMessageIdStatement(String nextDurableSubscriberMessageIdStatement){
-        this.nextDurableSubscriberMessageIdStatement=nextDurableSubscriberMessageIdStatement;
-    }
-
-     
+    }    
     
     /**
      * @param findNextMessagesStatement the findNextMessagesStatement to set
@@ -617,6 +616,16 @@
      */
     public void setDestinationMessageCountStatement(String destinationMessageCountStatement){
         this.destinationMessageCountStatement=destinationMessageCountStatement;
+    }
+
+    
+    
+    
+    /**
+     * @param lastAckedDurableSubscriberMessageStatement the lastAckedDurableSubscriberMessageStatement
to set
+     */
+    public void setLastAckedDurableSubscriberMessageStatement(String lastAckedDurableSubscriberMessageStatement){
+        this.lastAckedDurableSubscriberMessageStatement=lastAckedDurableSubscriberMessageStatement;
     }
     
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=480731&r1=480730&r2=480731
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Wed Nov 29 14:09:33 2006
@@ -544,6 +544,28 @@
             close(s);
         }
     }
+    
+    public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,ActiveMQDestination
destination,String clientId, String subscriberName) throws SQLException,IOException{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        long result = -1;
+        try{
+            s=c.getConnection().prepareStatement(statements.getLastAckedDurableSubscriberMessageStatement());
+            s.setString(1,destination.getQualifiedName());
+            s.setString(2,clientId);
+            s.setString(3,subscriberName);
+            rs=s.executeQuery();
+            if(rs.next()){
+                result=rs.getLong(1);
+            }
+            rs.close();
+            s.close();
+        }finally{
+            close(rs);
+            close(s);
+        }
+        return result;
+    }
 
     static private void close(PreparedStatement s){
         try{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=480731&r1=480730&r2=480731
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
Wed Nov 29 14:09:33 2006
@@ -112,6 +112,7 @@
         if(!subscriberContainer.containsKey(key)){
             subscriberContainer.put(key,info);
         }
+        //add the subscriber
         ListContainer container=addSubscriberMessageContainer(key);
         if(retroactive){
             for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
@@ -124,24 +125,9 @@
         }
     }
 
-    public synchronized void deleteSubscription(String clientId,String subscriptionName){
+    public synchronized void deleteSubscription(String clientId,String subscriptionName)
throws IOException{
         String key=getSubscriptionKey(clientId,subscriptionName);
-        subscriberContainer.remove(key);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        for(Iterator i=container.getListContainer().iterator();i.hasNext();){
-            ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
-            if(ref!=null){
-                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
-                if(tsa!=null){
-                    if(tsa.decrementCount()<=0){
-                        ackContainer.remove(ref.getAckEntry());
-                        messageContainer.remove(tsa.getMessageEntry());
-                    }else{
-                        ackContainer.update(ref.getAckEntry(),tsa);
-                    }
-                }
-            }
-        }
+        removeSubscriberMessageContainer(key);
     }
 
     public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener
listener)
@@ -223,6 +209,26 @@
         TopicSubContainer tsc=new TopicSubContainer(container);
         subscriberMessages.put(key,tsc);
         return container;
+    }
+    
+    protected void removeSubscriberMessageContainer(Object key) throws IOException {
+        subscriberContainer.remove(key);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
+        for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+            ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
+            if(ref!=null){
+                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
+                if(tsa!=null){
+                    if(tsa.decrementCount()<=0){
+                        ackContainer.remove(ref.getAckEntry());
+                        messageContainer.remove(tsa.getMessageEntry());
+                    }else{
+                        ackContainer.update(ref.getAckEntry(),tsa);
+                    }
+                }
+            }
+        }
+        store.deleteListContainer(key,"topic-subs");
     }
 
     public int getMessageCount(String clientId,String subscriberName) throws IOException{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=480731&r1=480730&r2=480731
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
Wed Nov 29 14:09:33 2006
@@ -132,24 +132,9 @@
         }
     }
 
-    public synchronized void deleteSubscription(String clientId,String subscriptionName){
+    public synchronized void deleteSubscription(String clientId,String subscriptionName)
throws IOException{
         String key=getSubscriptionKey(clientId,subscriptionName);
-        subscriberContainer.remove(key);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        for(Iterator i=container.getListContainer().iterator();i.hasNext();){
-            ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
-            if(ref!=null){
-                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
-                if(tsa!=null){
-                    if(tsa.decrementCount()<=0){
-                        ackContainer.remove(ref.getAckEntry());
-                        messageContainer.remove(tsa.getMessageEntry());
-                    }else{
-                        ackContainer.update(ref.getAckEntry(),tsa);
-                    }
-                }
-            }
-        }
+        removeSubscriberMessageContainer(key);
     }
 
     public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener
listener)
@@ -220,6 +205,26 @@
         TopicSubContainer tsc=new TopicSubContainer(container);
         subscriberMessages.put(key,tsc);
         return container;
+    }
+    
+    protected void removeSubscriberMessageContainer(Object key) throws IOException {
+        subscriberContainer.remove(key);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
+        for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+            ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
+            if(ref!=null){
+                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
+                if(tsa!=null){
+                    if(tsa.decrementCount()<=0){
+                        ackContainer.remove(ref.getAckEntry());
+                        messageContainer.remove(tsa.getMessageEntry());
+                    }else{
+                        ackContainer.update(ref.getAckEntry(),tsa);
+                    }
+                }
+            }
+        }
+        store.deleteListContainer(key,"topic-subs");
     }
 
     public int getMessageCount(String clientId,String subscriberName) throws IOException{

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDurableSubscriptionTest.java?view=auto&rev=480731
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDurableSubscriptionTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDurableSubscriptionTest.java
Wed Nov 29 14:09:33 2006
@@ -0,0 +1,32 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.usecases;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class KahaDurableSubscriptionTest extends DurableSubscriptionTestSupport{
+
+    protected PersistenceAdapter createPersistenceAdapter() throws IOException{
+        File dataDir=new File("target/test-data/durableKaha");
+        KahaPersistenceAdapter adaptor=new KahaPersistenceAdapter(dataDir);
+        return adaptor;
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDurableSubscriptionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDurableSubscriptionTest.java
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message