activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r377936 - in /incubator/activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/broker/jmx/ src/main/java/org/apache/activemq/broker/region/ src/main/java/org/apache/activemq/util/ src/test/java/org/apache/activemq/
Date Wed, 15 Feb 2006 04:35:14 GMT
Author: chirino
Date: Tue Feb 14 20:35:12 2006
New Revision: 377936

URL: http://svn.apache.org/viewcvs?rev=377936&view=rev
Log:
Fixed a few of the broken durable subscription tests.

Modified:
    incubator/activemq/trunk/activemq-core/project.xml
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/SubscriptionKey.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRedeliveredTest.java

Modified: incubator/activemq/trunk/activemq-core/project.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/project.xml?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/project.xml (original)
+++ incubator/activemq/trunk/activemq-core/project.xml Tue Feb 14 20:35:12 2006
@@ -347,6 +347,9 @@
                 
                 <!-- http://jira.activemq.org/jira/browse/AMQ-522 -->
                 <exclude>**/ProxyConnectorTest.*</exclude>
+                <!-- http://jira.activemq.org/jira/browse/AMQ-560 -->
+                <exclude>**/FanoutTransportBrokerTest.*</exclude>
+
 
             </excludes>
         </unitTest>

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
Tue Feb 14 20:35:12 2006
@@ -50,8 +50,8 @@
         super.destroySubscription(sub);
     }
 
-    protected Destination createDestination(ActiveMQDestination destination) throws Throwable
{
-        Destination rc = super.createDestination(destination);
+    protected Destination createDestination(ConnectionContext context, ActiveMQDestination
destination) throws Throwable {
+        Destination rc = super.createDestination(context, destination);
         regionBroker.register(destination, rc);
         return rc;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
Tue Feb 14 20:35:12 2006
@@ -48,8 +48,8 @@
         super.destroySubscription(sub);
     }
 
-    protected Destination createDestination(ActiveMQDestination destination) throws Throwable
{
-        Destination rc = super.createDestination(destination);
+    protected Destination createDestination(ConnectionContext context, ActiveMQDestination
destination) throws Throwable {
+        Destination rc = super.createDestination(context, destination);
         regionBroker.register(destination, rc);
         return rc;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
Tue Feb 14 20:35:12 2006
@@ -48,8 +48,8 @@
         super.destroySubscription(sub);
     }
 
-    protected Destination createDestination(ActiveMQDestination destination) throws Throwable
{
-        Destination rc = super.createDestination(destination);
+    protected Destination createDestination(ConnectionContext context, ActiveMQDestination
destination) throws Throwable {
+        Destination rc = super.createDestination(context, destination);
         regionBroker.register(destination, rc);
         return rc;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
Tue Feb 14 20:35:12 2006
@@ -50,8 +50,8 @@
         super.destroySubscription(sub);
     }
 
-    protected Destination createDestination(ActiveMQDestination destination) throws Throwable
{
-        Destination rc = super.createDestination(destination);
+    protected Destination createDestination(ConnectionContext context, ActiveMQDestination
destination) throws Throwable {
+        Destination rc = super.createDestination(context, destination);
         regionBroker.register(destination, rc);
         return rc;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Tue Feb 14 20:35:12 2006
@@ -73,7 +73,7 @@
     
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination)
throws Throwable {
         log.debug("Adding destination: "+destination);
-        Destination dest = createDestination(destination);
+        Destination dest = createDestination(context, destination);
         dest.start();
         synchronized(destinationsMutex){
             destinations.put(destination,dest);
@@ -241,7 +241,7 @@
     }
 
     protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo
info) throws Throwable;
-    abstract protected Destination createDestination(ActiveMQDestination destination) throws
Throwable;
+    abstract protected Destination createDestination(ConnectionContext context, ActiveMQDestination
destination) throws Throwable;
 
     public boolean isAutoCreateDestinations() {
         return autoCreateDestinations;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Tue Feb 14 20:35:12 2006
@@ -27,76 +27,66 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.util.SubscriptionKey;
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
 public class DurableTopicSubscription extends PrefetchSubscription {
     
-    final protected String clientId;
-    final protected String subscriptionName;
-    final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
-    
-    boolean active=true;
-    boolean recovered=true;
+    private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
+    private final ConcurrentHashMap destinations = new ConcurrentHashMap();
+    private final SubscriptionKey subscriptionKey;
+    private boolean active=false;
     
     public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo
info) throws InvalidSelectorException {
         super(broker,context, info);
-        this.clientId = context.getClientId();
-        this.subscriptionName = info.getSubcriptionName();
+        subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
     }
     
-    public DurableTopicSubscription(Broker broker,SubscriptionInfo info) throws InvalidSelectorException
{
-        super(broker,null, createFakeConsumerInfo(info));
-        this.clientId = info.getClientId();
-        this.subscriptionName = info.getSubcriptionName();
-        active=false;
-        recovered=false;        
-    }
-
-    private static ConsumerInfo createFakeConsumerInfo(SubscriptionInfo info) {
-        ConsumerInfo rc = new ConsumerInfo();
-        rc.setSelector(info.getSelector());
-        rc.setSubcriptionName(info.getSubcriptionName());
-        rc.setDestination(info.getDestination());
-        return rc;
-    }
-
     synchronized public boolean isActive() {
         return active;
     }
-    synchronized public boolean isRecovered() {
-        return recovered;
-    }
     
     protected boolean isFull() {
         return !active || super.isFull();
     }
     
     synchronized public void gc() {
-        if( !active && recovered ) {
-            recovered = false;
-            
-            for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
-                MessageReference node = (MessageReference) iter.next();
-                // node.decrementTargetCount();
-                iter.remove();
+    }
+
+    synchronized public void add(ConnectionContext context, Destination destination) throws
Throwable {
+        super.add(context, destination);
+        destinations.put(destination.getActiveMQDestination(), destination);
+        if( active ) {
+            Topic topic = (Topic) destination;            
+            topic.activate(context, this);
+        }
+    }
+   
+    synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws
Throwable {
+        if( !active ) {
+            this.active = true;
+            this.context = context;
+            this.info = info;
+            for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
+                Topic topic = (Topic) iter.next();
+                topic.activate(context, this);
             }
-            
-            for (Iterator iter = matched.iterator(); iter.hasNext();) {
-                MessageReference node = (MessageReference) iter.next();
-                // node.decrementTargetCount();
-                iter.remove();
+            if( !isFull() ) {                            
+                dispatchMatched();
             }
-            
-            delivered=0;
         }
     }
 
-    synchronized public void deactivate() {        
+    synchronized public void deactivate() throws Throwable {        
         active=false;
+        for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
+            Topic topic = (Topic) iter.next();
+            topic.deactivate(context, this);
+        }
         for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
-            
+
+            // Mark the dispatched messages as redelivered for next time.
             MessageReference node = (MessageReference) iter.next();
             Integer count = (Integer) redeliveredMessages.get(node.getMessageId());
             if( count !=null ) {
@@ -105,32 +95,16 @@
                 redeliveredMessages.put(node.getMessageId(), new Integer(1));
             }
             
-            // Undo the dispatch.
-            matched.addFirst(node);
             iter.remove();
         }
+        for (Iterator iter = matched.iterator(); iter.hasNext();) {
+            MessageReference node = (MessageReference) iter.next();
+            // node.decrementTargetCount();
+            iter.remove();
+        }        
         delivered=0;
     }
 
-    synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws
Throwable {
-        if( !active ) {
-            this.active = true;
-            this.context = context;
-            this.info = info;
-            if( !recovered ) {
-                recovered=true;
-                for (Iterator iter = destinations.iterator(); iter.hasNext();) {
-                    Topic topic = (Topic) iter.next();
-                    topic.recover(context, this, false);
-                }
-            } else {
-                if( !isFull() ) {                            
-                    dispatchMatched();
-                }
-            }
-        }
-    }
-
     protected MessageDispatch createMessageDispatch(MessageReference node, Message message)
{
         MessageDispatch md = super.createMessageDispatch(node, message);
         Integer count = (Integer) redeliveredMessages.get(node.getMessageId());
@@ -141,7 +115,9 @@
     }
 
     synchronized public void add(MessageReference node) throws Throwable {
-        assert recovered;
+        if( !active ) {
+            return;
+        }
         node = new IndirectMessageReference(node.getRegionDestination(), (Message) node);
         super.add(node);
         node.decrementReferenceCount();
@@ -152,7 +128,6 @@
     }
     
     public synchronized void acknowledge(ConnectionContext context, MessageAck ack) throws
Throwable {
-        assert recovered;
         super.acknowledge(context, ack);
     }
 
@@ -163,7 +138,7 @@
     }
     
     public String getSubscriptionName() {
-        return subscriptionName;
+        return subscriptionKey.getSubscriptionName();
     }
     
     public String toString() {
@@ -177,7 +152,11 @@
     }
 
     public String getClientId() {
-        return clientId;
+        return subscriptionKey.getClientId();
+    }
+
+    public SubscriptionKey getSubscriptionKey() {
+        return subscriptionKey;
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Feb 14 20:35:12 2006
@@ -37,6 +37,7 @@
  * @version $Revision: 1.15 $
  */
 abstract public class PrefetchSubscription extends AbstractSubscription{
+    
     static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
     final protected LinkedList matched=new LinkedList();
     final protected LinkedList dispatched=new LinkedList();
@@ -44,13 +45,17 @@
     int preLoadLimit=1024*100;
     int preLoadSize=0;
     boolean dispatching=false;
-
+    
+    long enqueueCounter;
+    long dispatchCounter;
+    
     public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
                     throws InvalidSelectorException{
         super(broker,context,info);
     }
 
     synchronized public void add(MessageReference node) throws Throwable{
+        enqueueCounter++;
         if(!isFull()&&!isSlaveBroker()){
             dispatch(node);
         }else{
@@ -244,8 +249,9 @@
         }
         // Make sure we can dispatch a message.
         if(canDispatch(node)&&!isSlaveBroker()){
+            dispatchCounter++;
             MessageDispatch md=createMessageDispatch(node,message);
-            dispatched.addLast(node);
+            dispatched.addLast(node);            
             incrementPreloadSize(node.getMessage().getSize());
             if(info.isDispatchAsync()){
                 md.setConsumer(new Runnable(){
@@ -325,4 +331,13 @@
      */
     protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference
node)
                     throws IOException{}
+
+
+    public long getDispatchCounter() {
+        return dispatchCounter;
+    }
+
+    public long getEnqueueCounter() {
+        return enqueueCounter;
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
Tue Feb 14 20:35:12 2006
@@ -54,7 +54,7 @@
 
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected Destination createDestination(ActiveMQDestination destination) throws Throwable
{
+    protected Destination createDestination(ConnectionContext context, ActiveMQDestination
destination) throws Throwable {
         MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
         Queue queue = new Queue(destination, memoryManager, store, destinationStatistics,
taskRunnerFactory);
         configureQueue(queue, destination);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
Tue Feb 14 20:35:12 2006
@@ -38,7 +38,7 @@
         setAutoCreateDestinations(false);
     }
 
-    protected Destination createDestination(ActiveMQDestination destination) throws Throwable
{
+    protected Destination createDestination(ConnectionContext context, ActiveMQDestination
destination) throws Throwable {
         final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
         return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory)
{
             

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
Tue Feb 14 20:35:12 2006
@@ -37,7 +37,7 @@
         setAutoCreateDestinations(false);
     }
 
-    protected Destination createDestination(ActiveMQDestination destination) throws Throwable
{
+    protected Destination createDestination(ConnectionContext context, ActiveMQDestination
destination) throws Throwable {
         final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
         return new Topic(destination, null, memoryManager, destinationStatistics, taskRunnerFactory)
{
             

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Tue Feb 14 20:35:12 2006
@@ -42,8 +42,8 @@
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.util.SubscriptionKey;
 
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * The Topic is a destination that sends a copy of a message to every active
@@ -64,7 +64,7 @@
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new FixedCountSubscriptionRecoveryPolicy();
     private boolean sendAdvisoryIfNoConsumers;
     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
-    private AtomicInteger durableSubscriberCounter = new AtomicInteger();
+    private final ConcurrentHashMap durableSubcribers = new ConcurrentHashMap();
     
     public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager,
DestinationStatistics parentStats,
             TaskRunnerFactory taskFactory) {
@@ -72,15 +72,6 @@
         this.destination = destination;
         this.store = store;
         this.usageManager = memoryManager;
-
-        // TODO: switch back when cache is working again.
-        // this.cache = cache;
-        // destinationStatistics.setMessagesCached(cache.getMessagesCached());
-        // CacheEvictionUsageListener listener = new
-        // CacheEvictionUsageListener(memoryManager, 90, 50, taskFactory);
-        // listener.add(cache);
-        // this.memoryManager.addUsageListener(listener);
-
         this.destinationStatistics.setParent(parentStats);
     }
 
@@ -89,142 +80,147 @@
     }
 
     public void addSubscription(ConnectionContext context, final Subscription sub) throws
Throwable {
-        destinationStatistics.getConsumers().increment();
+        
         sub.add(context, this);
-        if (sub.getConsumerInfo().isDurable()) {
-            recover(context, (DurableTopicSubscription) sub, true);
-        }
-        else {
-            recover(context, sub);
-        }
-    }
 
-    /**
-     * Used to recover the message list non durable subscriptions.  Recovery only happens
if the consumer is
-     * retroactive.
-     * 
-     * @param context
-     * @param sub
-     * @throws Throwable
-     */
-    private void recover(ConnectionContext context, final Subscription sub) throws Throwable
{
-        if (sub.getConsumerInfo().isRetroactive()) {
+        if ( !sub.getConsumerInfo().isDurable() ) {
+            
+            destinationStatistics.getConsumers().increment();
             
-            // synchronize with dispatch method so that no new messages are sent
-            // while we are recovering a subscription to avoid out of order messages.
-            dispatchValve.turnOff();
-            try {
+            // Do a retroactive recovery if needed.
+            if (sub.getConsumerInfo().isRetroactive()) {
                 
+                // synchronize with dispatch method so that no new messages are sent
+                // while we are recovering a subscription to avoid out of order messages.
+                dispatchValve.turnOff();
+                try {
+                    
+                    synchronized(consumers) {
+                        consumers.add(sub);
+                    }
+                    subscriptionRecoveryPolicy.recover(context, this, sub);
+                    
+                } finally {
+                    dispatchValve.turnOn();
+                }
+                
+            } else {
                 synchronized(consumers) {
                     consumers.add(sub);
                 }
-                subscriptionRecoveryPolicy.recover(context, this, sub);
-                
-            } finally {
-                dispatchValve.turnOn();
-            }
-            
+            }            
         } else {
+            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
+            durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
+        }
+    }
+    
+    public void removeSubscription(ConnectionContext context, Subscription sub) throws Throwable
{
+        if ( !sub.getConsumerInfo().isDurable() ) {
+            destinationStatistics.getConsumers().decrement();
             synchronized(consumers) {
-                consumers.add(sub);
+                consumers.remove(sub);
             }
         }
+        sub.remove(context, this);
     }
-
-    /**
-     * Used to recover the message list for a durable subscription.
-     * 
-     * @param context
-     * @param sub
-     * @param initialActivation
-     * @throws Throwable
-     */
-    public void recover(ConnectionContext context, final DurableTopicSubscription sub, boolean
initialActivation) throws Throwable {
-
+    
+    public void addInactiveSubscription(ConnectionContext context, DurableTopicSubscription
sub) throws Throwable {
+        sub.add(context, this);        
+        destinationStatistics.getConsumers().increment();
+        durableSubcribers.put(sub.getSubscriptionKey(), sub);
+    }
+   
+    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws
IOException {
+        if (store != null) {
+            store.deleteSubscription(key.clientId, key.subscriptionName);
+            durableSubcribers.remove(key);
+            destinationStatistics.getConsumers().decrement();
+        }
+    }
+    
+    public void activate(ConnectionContext context, final DurableTopicSubscription subscription)
throws Throwable {
+        
         // synchronize with dispatch method so that no new messages are sent
         // while
         // we are recovering a subscription to avoid out of order messages.
         dispatchValve.turnOff();
         try {
-
-            boolean persistenceWasOptimized = canOptimizeOutPersistence();
-            if (initialActivation) {
-                synchronized(consumers) {           
-                    consumers.add(sub);
-                    durableSubscriberCounter.incrementAndGet();
+        
+            synchronized(consumers) {           
+                consumers.add(subscription);
+            }
+            
+            if (store == null )
+                return;
+            
+            // Recover the durable subscription.
+            String clientId = subscription.getClientId();
+            String subscriptionName = subscription.getSubscriptionName();
+            String selector = subscription.getConsumerInfo().getSelector();
+            SubscriptionInfo info = store.lookupSubscription(clientId, subscriptionName);
+            if (info != null) {
+                // Check to see if selector changed.
+                String s1 = info.getSelector();
+                if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector)))
{
+                    // Need to delete the subscription
+                    store.deleteSubscription(clientId, subscriptionName);
+                    info = null;
                 }
             }
-
-            if (store != null) {
-                String clientId = sub.getClientId();
-                String subscriptionName = sub.getSubscriptionName();
-                String selector = sub.getConsumerInfo().getSelector();
-                SubscriptionInfo info = store.lookupSubscription(clientId, subscriptionName);
-                if (info != null) {
-                    // Check to see if selector changed.
-                    String s1 = info.getSelector();
-                    if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector)))
{
-                        // Need to delete the subscription
-                        store.deleteSubscription(clientId, subscriptionName);
-                        info = null;
+            // Do we need to create the subscription?
+            if (info == null) {
+                store.addSubsciption(clientId, subscriptionName, selector, subscription.getConsumerInfo().isRetroactive());
+            }
+    
+            final MessageEvaluationContext msgContext = new MessageEvaluationContext();
+            msgContext.setDestination(destination);
+            store.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener()
{
+                public void recoverMessage(Message message) throws Throwable {
+                    message.setRegionDestination(Topic.this);
+                    try {
+                        msgContext.setMessageReference(message);
+                        if (subscription.matches(message, msgContext)) {
+                            subscription.add(message);
+                        }
+                    }
+                    catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                    catch (IOException e) {
+                        // TODO: Need to handle this better.
+                        e.printStackTrace();
                     }
                 }
-                // Do we need to create the subscription?
-                if (info == null) {
-                    store.addSubsciption(clientId, subscriptionName, selector, sub.getConsumerInfo().isRetroactive());
+    
+                public void recoverMessageReference(String messageReference) throws Throwable
{
+                    throw new RuntimeException("Should not be called.");
                 }
-
-                if (sub.isRecovered()) {
-                    final MessageEvaluationContext msgContext = new MessageEvaluationContext();
-                    msgContext.setDestination(destination);
-                    store.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener()
{
-                        public void recoverMessage(Message message) throws Throwable {
-                            message.setRegionDestination(Topic.this);
-                            try {
-                                msgContext.setMessageReference(message);
-                                if (sub.matches(message, msgContext)) {
-                                    sub.add(message);
-                                }
-                            }
-                            catch (InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                            }
-                            catch (IOException e) {
-                                // TODO: Need to handle this better.
-                                e.printStackTrace();
-                            }
-                        }
-
-                        public void recoverMessageReference(String messageReference) throws
Throwable {
-                            throw new RuntimeException("Should not be called.");
-                        }
-                    });
-                    
-                    if( initialActivation && sub.getConsumerInfo().isRetroactive()
) {
-                        // Then use the subscriptionRecoveryPolicy since there will not be
any messages in the persistent store.
-                        if( persistenceWasOptimized ) {
-                            subscriptionRecoveryPolicy.recover(context, this, sub);
-                        } else {
-                            // TODO: implement something like
-                            // subscriptionRecoveryPolicy.recoverNonPersistent(context, this,
sub);
-                        }
-                    }
+            });
+            
+            if( true && subscription.getConsumerInfo().isRetroactive() ) {
+                // If nothing was in the persistent store, then try to use the recovery policy.
+                if( subscription.getEnqueueCounter() == 0 ) {
+                    subscriptionRecoveryPolicy.recover(context, this, subscription);
+                } else {
+                    // TODO: implement something like
+                    // subscriptionRecoveryPolicy.recoverNonPersistent(context, this, sub);
                 }
             }
-
+        
         }
         finally {
             dispatchValve.turnOn();
         }
     }
 
-    public void removeSubscription(ConnectionContext context, Subscription sub) throws Throwable
{
-        destinationStatistics.getConsumers().decrement();
-        synchronized(consumers) {
+    public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws
Throwable {        
+        synchronized(consumers) {           
             consumers.remove(sub);
         }
         sub.remove(context, this);
-    }
+    }    
+
 
     public void send(final ConnectionContext context, final Message message) throws Throwable
{
 
@@ -259,18 +255,7 @@
     }
 
     private boolean canOptimizeOutPersistence() {
-        return durableSubscriberCounter.get()==0;
-    }
-
-    public void createSubscription(SubscriptionKey key) {
-        durableSubscriberCounter.incrementAndGet();
-    }
-    
-    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws
IOException {
-        if (store != null) {
-            store.deleteSubscription(key.clientId, key.subscriptionName);
-            durableSubscriberCounter.decrementAndGet();
-        }
+        return durableSubcribers.size()==0;
     }
 
     public String toString() {
@@ -423,5 +408,6 @@
             }
         }
     }
+
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Tue Feb 14 20:35:12 2006
@@ -86,19 +86,21 @@
                     super.removeConsumer(context, sub.getConsumerInfo());
 
                     super.addConsumer(context, info);
-
+                    sub = (DurableTopicSubscription) durableSubscriptions.get(key);
                 }
                 else {
                     // Change the consumer id key of the durable sub.
                     if( sub.getConsumerInfo().getConsumerId()!=null )
                         subscriptions.remove(sub.getConsumerInfo().getConsumerId());
                     subscriptions.put(info.getConsumerId(), sub);
-                    sub.activate(context, info);
                 }
             }
             else {
                 super.addConsumer(context, info);
+                sub = (DurableTopicSubscription) durableSubscriptions.get(key);
             }
+            
+            sub.activate(context, info);
         }
         else {
             super.addConsumer(context, info);
@@ -145,7 +147,7 @@
 
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected Destination createDestination(ActiveMQDestination destination) throws Throwable
{
+    protected Destination createDestination(ConnectionContext context, ActiveMQDestination
destination) throws Throwable {
         TopicMessageStore store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic)
destination);
         Topic topic = new Topic(destination, store, memoryManager, destinationStatistics,
taskRunnerFactory);
         configureTopic(topic, destination);
@@ -154,13 +156,30 @@
         if (store != null) {            
             SubscriptionInfo[] infos = store.getAllSubscriptions();
             for (int i = 0; i < infos.length; i++) {
-                log.info("Restoring durable subscription: "+infos[i]);
-                createDurableSubscription(topic, infos[i]);
+                
+                SubscriptionInfo info = infos[i];
+                log.debug("Restoring durable subscription: "+infos);
+                SubscriptionKey key = new SubscriptionKey(info);
+                
+                // A single durable sub may be subscribing to multiple topics.  so it might
exist already.
+                DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
+                if( sub == null ) {
+                    sub = (DurableTopicSubscription) createSubscription(context, createInactiveConsumerInfo(info));
+                }
+                topic.addInactiveSubscription(context, sub);
             }            
         }
         
         return topic;
     }
+    
+    private static ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
+        ConsumerInfo rc = new ConsumerInfo();
+        rc.setSelector(info.getSelector());
+        rc.setSubcriptionName(info.getSubcriptionName());
+        rc.setDestination(info.getDestination());
+        return rc;
+    }
 
     protected void configureTopic(Topic topic, ActiveMQDestination destination) {
         if (policyMap != null) {
@@ -188,17 +207,6 @@
             return new TopicSubscription(broker,context, info, memoryManager);
         }
     }
-    
-    public Subscription createDurableSubscription(Topic topic, SubscriptionInfo info) throws
Throwable {
-        SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubcriptionName());
-        topic.createSubscription(key);
-        DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
-        sub = new DurableTopicSubscription(broker,info);
-        sub.add(null, topic);
-        durableSubscriptions.put(key, sub);
-        return sub;
-    }
-    
 
     /**
      */

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/SubscriptionKey.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/SubscriptionKey.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/SubscriptionKey.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/SubscriptionKey.java
Tue Feb 14 20:35:12 2006
@@ -16,17 +16,26 @@
  */
 package org.apache.activemq.util;
 
+import org.apache.activemq.command.SubscriptionInfo;
+
 public class SubscriptionKey {
+    
     public final String clientId;
     public final String subscriptionName;
     private final int hashValue;
 
+
+    public SubscriptionKey(SubscriptionInfo info) {
+        this(info.getClientId(), info.getSubcriptionName());
+    }
+
     public SubscriptionKey(String clientId, String subscriptionName) {
         this.clientId = clientId;
         this.subscriptionName = subscriptionName;
         hashValue = clientId.hashCode()^subscriptionName.hashCode();
     }
-    
+
+
     public int hashCode() {
         return hashValue;
     }
@@ -42,5 +51,13 @@
     
     public String toString() {
         return clientId+":"+subscriptionName;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public String getSubscriptionName() {
+        return subscriptionName;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRedeliveredTest.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
Tue Feb 14 20:35:12 2006
@@ -185,7 +185,10 @@
         Topic topic = session.createTopic("topic-"+getName());
         MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
 
-        MessageProducer producer = createProducer(session, topic);
+        // This case only works with persistent messages since transient messages
+        // are dropped when the consumer goes offline.
+        MessageProducer producer = session.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
         producer.send(createTextMessage(session));
 
         // Consume the message...



Mime
View raw message