activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1213208 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq...
Date Mon, 12 Dec 2011 12:19:44 GMT
Author: gtully
Date: Mon Dec 12 12:19:43 2011
New Revision: 1213208

URL: http://svn.apache.org/viewvc?rev=1213208&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3253 - rework to make recreation of temp destination
more specific to use case, result is no advisories like https://issues.apache.org/jira/browse/AMQ-2571.
This avoids the possibility of looping in a network. Additional test. Resolve race condition
on destination advisories, a remove could overwrite an inflight add command. A copy is now
in place for the remove

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1213208&r1=1213207&r2=1213208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Mon Dec 12 12:19:43 2011
@@ -183,6 +183,8 @@ public class AdvisoryBroker extends Brok
         super.removeDestination(context, destination, timeout);
         DestinationInfo info = destinations.remove(destination);
         if (info != null) {
+            // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so
duplicate
+            info = info.copy();
             info.setDestination(destination);
             info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
             ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
@@ -204,6 +206,8 @@ public class AdvisoryBroker extends Brok
         super.removeDestinationInfo(context, destInfo);   
         DestinationInfo info = destinations.remove(destInfo.getDestination());
         if (info != null) {
+            // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so
duplicate
+            info = info.copy();
             info.setDestination(destInfo.getDestination());
             info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
             ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
@@ -214,7 +218,6 @@ public class AdvisoryBroker extends Brok
             }
             try {
                 next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()),
-1);
-            
             } catch (Exception expectedIfDestinationDidNotExistYet) {
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?rev=1213208&r1=1213207&r2=1213208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
Mon Dec 12 12:19:43 2011
@@ -28,6 +28,7 @@ import org.apache.activemq.command.WireF
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
 import org.apache.activemq.security.SecurityContext;
+import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.transaction.Transaction;
 
 /**
@@ -58,6 +59,7 @@ public class ConnectionContext {
     private final MessageEvaluationContext messageEvaluationContext;
     private boolean dontSendReponse;
     private boolean clientMaster = true;
+    private ConnectionState connectionState;
 
     public ConnectionContext() {
     	this.messageEvaluationContext = new MessageEvaluationContext();
@@ -320,4 +322,11 @@ public class ConnectionContext {
         this.faultTolerant = faultTolerant;
     }
 
+    public void setConnectionState(ConnectionState connectionState) {
+        this.connectionState = connectionState;
+    }
+
+    public ConnectionState getConnectionState() {
+        return this.connectionState;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1213208&r1=1213207&r2=1213208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Mon Dec 12 12:19:43 2011
@@ -668,6 +668,7 @@ public class TransportConnection impleme
         context.setWireFormatInfo(wireFormatInfo);
         context.setReconnect(info.isFailoverReconnect());
         this.manageable = info.isManageable();
+        context.setConnectionState(state);
         state.setContext(context);
         state.setConnection(this);
         if (info.getClientIp() == null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=1213208&r1=1213207&r2=1213208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Mon Dec 12 12:19:43 2011
@@ -125,7 +125,9 @@ public abstract class AbstractRegion imp
 
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
             boolean createIfTemporary) throws Exception {
-        LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
+        }
 
         destinationsLock.writeLock().lock();
         try {
@@ -142,18 +144,6 @@ public abstract class AbstractRegion imp
                     destinations.put(destination, dest);
                     destinationMap.put(destination, dest);
                     addSubscriptionsForDestination(context, dest);
-                    if (destination.isTemporary()) {
-                        // need to associate with the connection so it can get removed
-                        if (context.getConnection() instanceof TransportConnection) {
-                            TransportConnection transportConnection = (TransportConnection)
context.getConnection();
-                            DestinationInfo info = new DestinationInfo(context.getConnectionId(),
-                                    DestinationInfo.ADD_OPERATION_TYPE,
-                                    destination);
-                            transportConnection.processAddDestination(info);
-                            LOG.debug("assigning ownership of auto created temp : " + destination
+ " to connection:"
-                                    + context.getConnectionId());
-                        }
-                    }
                 }
                 if (dest == null) {
                     throw new JMSException("The destination " + destination + " does not
exist.");
@@ -207,7 +197,9 @@ public abstract class AbstractRegion imp
             // dropping the subscription.
         }
 
-        LOG.debug("Removing destination: " + destination);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(broker.getBrokerName() + " removing destination: " + destination);
+        }
 
         destinationsLock.writeLock().lock();
         try {
@@ -229,7 +221,9 @@ public abstract class AbstractRegion imp
                 }
 
             } else {
-                LOG.debug("Destination doesn't exist: " + dest);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Destination doesn't exist: " + dest);
+                }
             }
         } finally {
             destinationsLock.writeLock().unlock();
@@ -262,8 +256,10 @@ public abstract class AbstractRegion imp
 
     @SuppressWarnings("unchecked")
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws
Exception {
-        LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() +
" for destination: "
-                + info.getDestination());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId()
+ " for destination: "
+                    + info.getDestination());
+        }
         ActiveMQDestination destination = info.getDestination();
         if (destination != null && !destination.isPattern() && !destination.isComposite())
{
             // lets auto-create the destination
@@ -362,8 +358,10 @@ public abstract class AbstractRegion imp
 
     @SuppressWarnings("unchecked")
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception
{
-        LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId()
+ " for destination: "
-                + info.getDestination());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId()
+ " for destination: "
+                    + info.getDestination());
+        }
 
         Subscription sub = subscriptions.remove(info.getConsumerId());
         // The sub could be removed elsewhere - see ConnectionSplitBroker
@@ -418,7 +416,9 @@ public abstract class AbstractRegion imp
                     LOG.warn("Ack for non existent subscription, ack:" + ack);
                     throw new IllegalArgumentException("The subscription does not exist:
" + ack.getConsumerId());
                 } else {
-                    LOG.debug("Ack for non existent subscription in recovery, ack:" + ack);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Ack for non existent subscription in recovery, ack:" +
ack);
+                    }
                     return;
                 }
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1213208&r1=1213207&r2=1213208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Mon Dec 12 12:19:43 2011
@@ -37,6 +37,7 @@ import org.apache.activemq.broker.Connec
 import org.apache.activemq.broker.ConsumerBrokerExchange;
 import org.apache.activemq.broker.EmptyBroker;
 import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransportConnection;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -373,9 +374,22 @@ public class RegionBroker extends EmptyB
         if (destination != null) {
             inactiveDestinationsPurgeLock.readLock().lock();
             try {
-                // This seems to cause the destination to be added but without
-                // advisories firing...
-                context.getBroker().addDestination(context, destination, true);
+                if (!destinations.containsKey(destination)) {
+                    // This seems to cause the destination to be added but without
+                    // advisories firing...
+                    context.getBroker().addDestination(context, destination, true);
+                    // associate it with the connection so that it can get deleted
+                    if (destination.isTemporary() && context.getConnectionState()
!= null) {
+                        DestinationInfo destinationInfo = new DestinationInfo(context.getConnectionId(),
+                                DestinationInfo.ADD_OPERATION_TYPE,
+                                destination);
+                        context.getConnectionState().addTempDestination(destinationInfo);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("assigning ownership of auto created temp : " + destination
+ " to connection:"
+                                    + context.getConnectionId());
+                        }
+                    }
+                }
                 switch (destination.getDestinationType()) {
                 case ActiveMQDestination.QUEUE_TYPE:
                     queueRegion.addProducer(context, info);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java?rev=1213208&r1=1213207&r2=1213208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/DestinationInfo.java
Mon Dec 12 12:19:43 2011
@@ -126,4 +126,13 @@ public class DestinationInfo extends Bas
         throw new IOException("Unknown operation type: " + getOperationType());
     }
 
+    public DestinationInfo copy() {
+        DestinationInfo result = new DestinationInfo();
+        super.copy(result);
+        result.connectionId = connectionId;
+        result.destination = destination;
+        result.operationType = operationType;
+        result.brokerPath = brokerPath;
+        return result;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1213208&r1=1213207&r2=1213208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Mon Dec 12 12:19:43 2011
@@ -553,7 +553,7 @@ public abstract class DemandForwardingBr
             }
             destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
             if (LOG.isTraceEnabled()) {
-                LOG.trace("bridging destination control command: " + destInfo);
+                LOG.trace(configuration.getBrokerName() +" bridging destination control command:
" + destInfo);
             }
             localBroker.oneway(destInfo);
         } else if (data.getClass() == RemoveInfo.class) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1213208&r1=1213207&r2=1213208&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
Mon Dec 12 12:19:43 2011
@@ -48,6 +48,8 @@ import org.apache.activemq.advisory.Cons
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -187,6 +189,18 @@ public class JmsMultipleBrokersTestSuppo
         return result;
     }
 
+    protected void waitForMinTopicRegionConsumerCount(final String name, final int count)
throws Exception {
+        final BrokerService broker = brokers.get(name).broker;
+        final TopicRegion topicRegion =  (TopicRegion) ((RegionBroker) broker.getRegionBroker()).getTopicRegion();
+        assertTrue("found expected consumers in topic region of" + name, Wait.waitFor(new
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("topic consumers: " + name +", " +  topicRegion.getSubscriptions().toString());
+                return topicRegion.getSubscriptions().size()  >= count;
+            }
+        }));
+    }
+
     protected void waitForBridgeFormation() throws Exception {
         waitForBridgeFormation(1);
     }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java?rev=1213208&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java
Mon Dec 12 12:19:43 2011
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TwoBrokerTempQueueAdvisoryTest extends JmsMultipleBrokersTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(TwoBrokerTempQueueAdvisoryTest.class);
+
+    private void sendReceiveTempQueueMessage(String broker) throws Exception {
+
+    	ConnectionFactory factory = getConnectionFactory(broker);
+    	Connection conn = factory.createConnection();
+    	Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    	Destination dest = session.createTemporaryQueue();
+    	conn.close();
+    }
+
+    public void testTemporaryQueueAdvisory() throws Exception {
+    	LOG.info("Running testTemporaryQueueAdvisory()");
+
+    	startAllBrokers();
+        waitForBridgeFormation();
+        waitForMinTopicRegionConsumerCount("BrokerB", 1);
+        waitForMinTopicRegionConsumerCount("BrokerA", 1);
+
+        final int iterations = 30;
+        for (int i = 0; i < iterations; i++) {
+	        sendReceiveTempQueueMessage("BrokerA");
+        }
+
+        waitForMinTopicRegionConsumerCount("BrokerB", 1);
+        waitForMinTopicRegionConsumerCount("BrokerA", 1);
+
+        final DestinationViewMBean brokerAView = createView("BrokerA", "ActiveMQ.Advisory.TempQueue",
ActiveMQDestination.TOPIC_TYPE);
+        assertTrue("exact amount of advisories created on A, one each for creation/deletion",
Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("BrokerA temp advisory enque count: " + brokerAView.getEnqueueCount());
+                return iterations * 2 == brokerAView.getEnqueueCount();
+            }
+        }));
+
+        final DestinationViewMBean brokerBView = createView("BrokerB", "ActiveMQ.Advisory.TempQueue",
ActiveMQDestination.TOPIC_TYPE);
+        assertTrue("exact amount of advisories created on B, one each for creation/deletion",
Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("BrokerB temp advisory enque count: " + brokerBView.getEnqueueCount());
+                return iterations * 2 == brokerBView.getEnqueueCount();
+            }
+        }));
+    }
+
+
+    protected DestinationViewMBean createView(String broker, String destination, byte type)
throws Exception {
+        String domain = "org.apache.activemq";
+        ObjectName name;
+        if (type == ActiveMQDestination.QUEUE_TYPE) {
+            name = new ObjectName(domain + ":BrokerName=" + broker + ",Type=Queue,Destination="
+ destination);
+        } else {
+            name = new ObjectName(domain + ":BrokerName=" + broker + ",Type=Topic,Destination="
+ destination);
+        }
+        return (DestinationViewMBean) brokers.get(broker).broker.getManagementContext().newProxyInstance(name,
DestinationViewMBean.class,
+                true);
+    }
+
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+
+        String options = new String("?persistent=false");
+        createBroker(new URI("broker:(tcp://localhost:0)/BrokerA" + options));
+        createBroker(new URI("broker:(tcp://localhost:0)/BrokerB" + options));
+
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerA");
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message