activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r650143 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/virtual/ test/java/org/apache/activemq/advisory/ test/java/org/ap...
Date Mon, 21 Apr 2008 13:41:25 GMT
Author: rajdavies
Date: Mon Apr 21 06:41:19 2008
New Revision: 650143

URL: http://svn.apache.org/viewvc?rev=650143&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1672

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Mon Apr 21 06:41:19 2008
@@ -1235,6 +1235,10 @@
         return getBroker().addDestination(getAdminConnectionContext(), destination);
     }
     
+    public void removeDestination(ActiveMQDestination destination) throws Exception {
+        getBroker().removeDestination(getAdminConnectionContext(), destination,0);
+    }
+    
     public int getProducerSystemUsagePortion() {
         return producerSystemUsagePortion;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Mon Apr 21 06:41:19 2008
@@ -183,6 +183,10 @@
                 }
                 destinationMap.removeAll(destination);
                 dispose(context,dest);
+                DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
+                if (destinationInterceptor != null) {
+                    destinationInterceptor.remove(dest);
+                }
 
             } else {   
                 LOG.debug("Destination doesn't exist: " + dest);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
Mon Apr 21 06:41:19 2008
@@ -17,15 +17,12 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.management.ObjectName;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -44,7 +41,6 @@
 
     private static final Log LOG = LogFactory.getLog(AbstractSubscription.class);
     protected Broker broker;
-    protected Destination destination;
     protected ConnectionContext context;
     protected ConsumerInfo info;
     protected final DestinationFilter destinationFilter;
@@ -53,9 +49,8 @@
     private ObjectName objectName;
 
 
-    public AbstractSubscription(Broker broker, Destination destination,ConnectionContext
context, ConsumerInfo info) throws InvalidSelectorException {
+    public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
         this.broker = broker;
-        this.destination=destination;
         this.context = context;
         this.info = info;
         this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java
Mon Apr 21 06:41:19 2008
@@ -35,5 +35,12 @@
         }
         return destination;
     }
+
+   
+    public void remove(Destination destination) {
+        for (int i = 0; i < interceptors.length; i++) {
+            interceptors[i].remove(destination);
+        } 
+    }
     
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Mon Apr 21 06:41:19 2008
@@ -19,7 +19,6 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Set;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -30,7 +29,6 @@
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
 
 /**
  * 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java
Mon Apr 21 06:41:19 2008
@@ -25,5 +25,7 @@
 public interface DestinationInterceptor {
 
     Destination intercept(Destination destination);
+    
+    void remove(Destination destination);
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Mon Apr 21 06:41:19 2008
@@ -49,23 +49,14 @@
     private final boolean keepDurableSubsActive;
     private boolean active;
 
-    public DurableTopicSubscription(Broker broker, Destination dest,SystemUsage usageManager,
ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
+    public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext
context, ConsumerInfo info, boolean keepDurableSubsActive)
         throws JMSException {
-        super(broker,dest,usageManager, context, info);
+        super(broker,usageManager, context, info);
         this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(),
info.getPrefetchSize(), this);
         this.pending.setSystemUsage(usageManager);
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
-        if (dest != null && dest.getMessageStore() != null) {
-            TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
-            try {
-                this.enqueueCounter=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
-            } catch (IOException e) {
-                JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from
store "+ e);
-                jmsEx.setLinkedException(e);
-                throw jmsEx;
-            }
-        }
+        
     }
 
     public boolean isActive() {
@@ -82,6 +73,16 @@
     public void add(ConnectionContext context, Destination destination) throws Exception
{
         super.add(context, destination);
         destinations.put(destination.getActiveMQDestination(), destination);
+        if (destination.getMessageStore() != null) {
+            TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
+            try {
+                this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
+            } catch (IOException e) {
+                JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from
store "+ e);
+                jmsEx.setLinkedException(e);
+                throw jmsEx;
+            }
+        }
         if (active || keepDurableSubsActive) {
             Topic topic = (Topic)destination;
             topic.activate(context, this);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Apr 21 06:41:19 2008
@@ -66,14 +66,14 @@
     private final Object dispatchLock = new Object();
     protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
 
-    public PrefetchSubscription(Broker broker,Destination destination, SystemUsage usageManager,
ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException
{
-        super(broker,destination, context, info);
+    public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext
context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
+        super(broker,context, info);
         this.usageManager=usageManager;
         pending = cursor;
     }
 
-    public PrefetchSubscription(Broker broker,Destination destination, SystemUsage usageManager,
ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
-        this(broker,destination,usageManager,context, info, new VMPendingMessageCursor());
+    public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext
context, ConsumerInfo info) throws InvalidSelectorException {
+        this(broker,usageManager,context, info, new VMPendingMessageCursor());
     }
 
     /**
@@ -168,9 +168,10 @@
                         + mdn.getMessageId() + ") was not in the pending list");
     }
 
-    public  void acknowledge(final ConnectionContext context,final MessageAck ack) throws
Exception {
+    public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws
Exception {
         // Handle the standard acknowledgment case.
         boolean callDispatchMatched = false;
+        Destination destination = null;
         synchronized(dispatchLock) {
             if (ack.isStandardAck()) {
                 // Acknowledge all dispatched messages up till the message id of
@@ -233,6 +234,7 @@
                                 prefetchExtension = Math.max(0,
                                         prefetchExtension - (index + 1));
                             }
+                            destination = node.getRegionDestination();
                             callDispatchMatched = true;
                             break;
                         }
@@ -268,6 +270,7 @@
                     }
                     if (ack.getLastMessageId().equals(node.getMessageId())) {
                         prefetchExtension = Math.max(prefetchExtension, index + 1);
+                        destination = node.getRegionDestination();
                         callDispatchMatched = true;
                         break;
                     }
@@ -294,6 +297,7 @@
                     if (inAckRange) {
                         node.incrementRedeliveryCounter();
                         if (ack.getLastMessageId().equals(messageId)) {
+                            destination = node.getRegionDestination();
                             callDispatchMatched = true;
                             break;
                         }
@@ -335,6 +339,7 @@
                         if (ack.getLastMessageId().equals(messageId)) {
                             prefetchExtension = Math.max(0, prefetchExtension
                                     - (index + 1));
+                            destination = node.getRegionDestination();
                             callDispatchMatched = true;
                             break;
                         }
@@ -350,7 +355,7 @@
                 }
             }
         }
-        if (callDispatchMatched) {
+        if (callDispatchMatched && destination != null) {
             if (destination.isLazyDispatch()) {
                 destination.wakeup();
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
Mon Apr 21 06:41:19 2008
@@ -17,9 +17,7 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-
 import javax.jms.InvalidSelectorException;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ConsumerInfo;
@@ -33,9 +31,9 @@
     boolean browseDone;
     boolean destinationsAdded;
 
-    public QueueBrowserSubscription(Broker broker,Destination destination, SystemUsage usageManager,
ConnectionContext context, ConsumerInfo info)
+    public QueueBrowserSubscription(Broker broker,SystemUsage usageManager, ConnectionContext
context, ConsumerInfo info)
         throws InvalidSelectorException {
-        super(broker,destination,usageManager, context, info);
+        super(broker,usageManager, context, info);
     }
 
     protected boolean canDispatch(MessageReference node) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
Mon Apr 21 06:41:19 2008
@@ -46,18 +46,11 @@
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
         throws JMSException {
-        Destination dest = null;
-        try {
-            dest = lookup(context, info.getDestination());
-        } catch (Exception e) {
-            JMSException jmsEx = new JMSException("Failed to retrieve destination from region
"+ e);
-            jmsEx.setLinkedException(e);
-            throw jmsEx;
-        }
+        
         if (info.isBrowser()) {
-            return new QueueBrowserSubscription(broker,dest,usageManager, context, info);
+            return new QueueBrowserSubscription(broker,usageManager, context, info);
         } else {
-            return new QueueSubscription(broker, dest,usageManager,context, info);
+            return new QueueSubscription(broker, usageManager,context, info);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Mon Apr 21 06:41:19 2008
@@ -17,10 +17,8 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.group.MessageGroupMap;
@@ -28,7 +26,6 @@
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,8 +34,8 @@
 
     private static final Log LOG = LogFactory.getLog(QueueSubscription.class);
 
-    public QueueSubscription(Broker broker, Destination destination,SystemUsage usageManager,
ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
-        super(broker,destination,usageManager, context, info);
+    public QueueSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context,
ConsumerInfo info) throws InvalidSelectorException {
+        super(broker,usageManager, context, info);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
Mon Apr 21 06:41:19 2008
@@ -50,18 +50,10 @@
     }
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
throws JMSException {
-        Destination dest=null;
-        try {
-            dest = lookup(context, info.getDestination());
-        } catch (Exception e) {
-            JMSException jmsEx = new JMSException("Failed to retrieve destination from region
"+ e);
-            jmsEx.setLinkedException(e);
-            throw jmsEx;
-        }
         if (info.isBrowser()) {
-            return new QueueBrowserSubscription(broker,dest,usageManager,context, info);
+            return new QueueBrowserSubscription(broker,usageManager,context, info);
         } else {
-            return new QueueSubscription(broker,dest, usageManager,context, info);
+            return new QueueSubscription(broker,usageManager,context, info);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
Mon Apr 21 06:41:19 2008
@@ -47,9 +47,7 @@
             throw new JMSException("A durable subscription cannot be created for a temporary
topic.");
         }
         try {
-           
-            Destination dest = lookup(context, info.getDestination());
-            TopicSubscription answer = new TopicSubscription(broker, dest,context, info,
usageManager);
+            TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
             // lets configure the subscription depending on the destination
             ActiveMQDestination destination = info.getDestination();
             if (destination != null && broker.getDestinationPolicy() != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Mon Apr 21 06:41:19 2008
@@ -228,14 +228,7 @@
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
throws JMSException {
         ActiveMQDestination destination = info.getDestination();
-        Destination dest=null;
-        try {
-            dest = lookup(context, destination);
-        } catch (Exception e) {
-            JMSException jmsEx = new JMSException("Failed to retrieve destination from region
"+ e);
-            jmsEx.setLinkedException(e);
-            throw jmsEx;
-        }
+        
         if (info.isDurable()) {
             if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
                 throw new JMSException("Cannot create a durable subscription for an advisory
Topic");
@@ -245,7 +238,7 @@
             
             if (sub == null) {
                 
-                sub = new DurableTopicSubscription(broker,dest, usageManager, context, info,
keepDurableSubsActive);
+                sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
                 if (destination != null && broker.getDestinationPolicy() != null)
{
                     PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
                     if (entry != null) {
@@ -259,7 +252,7 @@
             return sub;
         }
         try {
-            TopicSubscription answer = new TopicSubscription(broker, dest,context, info,
usageManager);
+            TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
             // lets configure the subscription depending on the destination
             if (destination != null && broker.getDestinationPolicy() != null) {
                 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Mon Apr 21 06:41:19 2008
@@ -20,11 +20,8 @@
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.jms.JMSException;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
@@ -65,8 +62,8 @@
     private final AtomicLong dequeueCounter = new AtomicLong(0);
     private int memoryUsageHighWaterMark = 95;
 
-    public TopicSubscription(Broker broker, Destination destination,ConnectionContext context,
ConsumerInfo info, SystemUsage usageManager) throws Exception {
-        super(broker, destination,context, info);
+    public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info,
SystemUsage usageManager) throws Exception {
+        super(broker, context, info);
         this.usageManager = usageManager;
         String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement()
+ "[" + info.getConsumerId().toString() + "]";
         if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null
) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
Mon Apr 21 06:41:19 2008
@@ -34,6 +34,10 @@
     public Destination intercept(Destination destination) {
         return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(),
isCopyMessage());
     }
+    
+
+    public void remove(Destination destination) {        
+    }
 
     public String getName() {
         return name;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
Mon Apr 21 06:41:19 2008
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
Mon Apr 21 06:41:19 2008
@@ -67,6 +67,22 @@
         }
         return destination;
     }
+    
+
+    public void remove(Destination destination) {
+        if (brokerService == null) {
+            throw new IllegalArgumentException("No brokerService injected!");
+        }
+        ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination());
+        if (topic != null) {
+            try {
+                brokerService.removeDestination(topic);
+            } catch (Exception e) {
+                LOG.error("Failed to remove mirror destination for " + destination + ". Reason:
" + e,e);
+            }
+        }
+        
+    }
 
     // Properties
     // -------------------------------------------------------------------------
@@ -124,4 +140,5 @@
     protected ActiveMQDestination getMirrorTopic(ActiveMQDestination original) {
         return new ActiveMQTopic(prefix + original.getPhysicalName() + postfix);
     }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
Mon Apr 21 06:41:19 2008
@@ -40,7 +40,7 @@
     private DestinationMap destinationMap = new DestinationMap();
     private VirtualDestination[] virtualDestinations;
 
-    public Destination intercept(Destination destination) {
+    public synchronized Destination intercept(Destination destination) {
         Set virtualDestinations = destinationMap.get(destination.getActiveMQDestination());
         List<Destination> destinations = new ArrayList<Destination>();
         for (Iterator iter = virtualDestinations.iterator(); iter.hasNext();) {
@@ -57,6 +57,10 @@
             }
         }
         return destination;
+    }
+    
+
+    public synchronized void remove(Destination destination) {     
     }
 
     public VirtualDestination[] getVirtualDestinations() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
Mon Apr 21 06:41:19 2008
@@ -46,6 +46,10 @@
         return new VirtualTopicInterceptor(destination, getPrefix(), getPostfix());
     }
     
+
+    public void remove(Destination destination) {        
+    }
+    
     // Properties
     // -------------------------------------------------------------------------
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java
Mon Apr 21 06:41:19 2008
@@ -25,6 +25,7 @@
 
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
 
 /**
  * @version $Revision: 397249 $
@@ -56,9 +57,9 @@
                
         RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(
                 RegionBroker.class);
-        //there should be 3 destinations - advisories - 
-        //1 for the connection + 2 generic ones
-        assertTrue(rb.getDestinationMap().size()==3);            
+        //there should be 2 destinations - advisories - 
+        //1 for the connection + 1 generic ones
+        assertTrue(rb.getDestinationMap().size()==2);            
     }
     
     public void testLoadTempAdvisoryTopics() throws Exception {
@@ -78,9 +79,9 @@
         assertTrue(ab.getAdvisoryProducers().size() == 0);
         RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(
                 RegionBroker.class);
-        //there should be 3 destinations - advisories - 
-        //1 for the connection + 2 generic ones
-        assertTrue(rb.getDestinationMap().size()==3);        
+        //there should be 2 destinations - advisories - 
+        //1 for the connection + 1 generic ones
+        assertTrue(rb.getDestinationMap().size()==2);        
         
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
Mon Apr 21 06:41:19 2008
@@ -86,7 +86,7 @@
         
                
         //serverDestination + 
-        assertTrue(rb.getDestinationMap().size()==7);          
+        assertTrue(rb.getDestinationMap().size()==6);          
     }
 
     protected void setUp() throws Exception {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java?rev=650143&r1=650142&r2=650143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java
Mon Apr 21 06:41:19 2008
@@ -21,9 +21,11 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-
+import javax.jms.TemporaryQueue;
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.spring.ConsumerBean;
@@ -71,6 +73,20 @@
         messageList.assertMessagesArrived(total);
 
         LOG.info("Received: " + messageList);
+    }
+    
+    public void testTempMirroredQueuesClearDown() throws Exception{
+        if (connection == null) {
+            connection = createConnection();
+        }
+        connection.start();
+        Session session = connection.createSession(false, 0);
+        TemporaryQueue tempQueue = session.createTemporaryQueue();
+        RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(
+                RegionBroker.class);
+        assertTrue(rb.getDestinationMap().size()==4);
+        tempQueue.delete();
+        assertTrue(rb.getDestinationMap().size()==3);        
     }
 
     protected Destination createConsumeDestination() {



Mime
View raw message