activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1448161 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java activemq-unit-tests/pom.xml
Date Wed, 20 Feb 2013 13:33:36 GMT
Author: gtully
Date: Wed Feb 20 13:33:36 2013
New Revision: 1448161

URL: http://svn.apache.org/r1448161
Log:
Revert "Fix for https://issues.apache.org/jira/browse/AMQ-4000 Durable subscription not getting
unregistered on networked broker, thanks torsten for the unit test!"
added DurableSubInBrokerNetworkTest to broken test profile till we get this resolved.

This reverts commit b7c32d924af5ada1a2068c77f3bf8e44267edea4.

Conflicts:
	activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-unit-tests/pom.xml

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1448161&r1=1448160&r2=1448161&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Wed Feb 20 13:33:36 2013
@@ -29,13 +29,25 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
-import org.apache.activemq.util.SubscriptionKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +65,6 @@ public class AdvisoryBroker extends Brok
     protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId,
ProducerInfo>();
     protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations
= new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
     protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges =
new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
-    protected final ConcurrentHashMap<SubscriptionKey, ActiveMQTopic> durableSubscriptions
= new ConcurrentHashMap<SubscriptionKey, ActiveMQTopic>();
     protected final ProducerId advisoryProducerId = new ProducerId();
 
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
@@ -81,12 +92,6 @@ public class AdvisoryBroker extends Brok
 
         // Don't advise advisory topics.
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
-            if (info.getDestination().isTopic() && info.isDurable()) {
-                SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
-                if (!this.durableSubscriptions.contains(key)) {
-                    this.durableSubscriptions.put(key, (ActiveMQTopic)info.getDestination());
-                }
-            }
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
             consumers.put(info.getConsumerId(), info);
             fireConsumerAdvisory(context, info.getDestination(), topic, info);
@@ -259,26 +264,6 @@ public class AdvisoryBroker extends Brok
     }
 
     @Override
-    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info)
throws Exception {
-        super.removeSubscription(context, info);
-
-        SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
-
-        ActiveMQTopic dest = durableSubscriptions.get(key);
-        if (dest == null) {
-            LOG.warn("We cannot send an advisory message for a durable sub removal when we
don't know about the durable sub");
-        }
-
-        // Don't advise advisory topics.
-        if (!AdvisorySupport.isAdvisoryTopic(dest)) {
-            ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
-            durableSubscriptions.remove(key);
-            fireConsumerAdvisory(context,dest, topic, info);
-        }
-
-    }
-
-    @Override
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception
{
         super.removeProducer(context, info);
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1448161&r1=1448160&r2=1448161&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Feb 20 13:33:36 2013
@@ -50,7 +50,32 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.NetworkBridgeFilter;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.security.SecurityContext;
@@ -791,11 +816,6 @@ public abstract class DemandForwardingBr
         } else if (data.getClass() == RemoveInfo.class) {
             ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
             removeDemandSubscription(id);
-        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
-            RemoveSubscriptionInfo durableSub = (RemoveSubscriptionInfo)data;
-            LOG.debug("Removing durable subscription: clientId: "  + durableSub.getClientId()
-                    + ", durableName: " + durableSub.getSubcriptionName());
-            localBroker.oneway(data);
         }
     }
 

Modified: activemq/trunk/activemq-unit-tests/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/pom.xml?rev=1448161&r1=1448160&r2=1448161&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/pom.xml (original)
+++ activemq/trunk/activemq-unit-tests/pom.xml Wed Feb 20 13:33:36 2013
@@ -570,6 +570,8 @@
                 <exclude>**/StoreQueueCursorJournalNoDuplicateTest.*</exclude>
                 <exclude>**/org.apache.activemq.usecases.ThreeBrokerVirtualTopicNetworkAMQPATest.*</exclude>
                 <exclude>**/LevelDBXARecoveryBrokerTest.*</exclude>
+                <!-- https://issues.apache.org/jira/browse/AMQ-4000 -->
+                <exclude>**/DurableSubInBrokerNetworkTest.*</exclude>
               </excludes>
             </configuration>
           </plugin>



Mime
View raw message