activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1087912 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/usecases/
Date Fri, 01 Apr 2011 21:56:35 GMT
Author: gtully
Date: Fri Apr  1 21:56:34 2011
New Revision: 1087912

URL: http://svn.apache.org/viewvc?rev=1087912&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3253 - Support Temporary Destinations in a network
without advisories. Support hub and spoke, such that two spokes can communicate. Revisit https://issues.apache.org/jira/browse/AMQ-2571
to make creating temp destinations on command configurable broker setAllowTempAutoCreationOnSend=true
and allow temp destinations to be gced via gcInactiveDestinations policy

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1087912&r1=1087911&r2=1087912&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Fri Apr  1 21:56:34 2011
@@ -198,6 +198,7 @@ public class BrokerService implements Se
     private int schedulePeriodForDestinationPurge=5000;
     private BrokerContext brokerContext;
     private boolean networkConnectorStartAsync = false;
+    private boolean allowTempAutoCreationOnSend;
 
 	static {
         String localHostName = "localhost";
@@ -1833,6 +1834,7 @@ public class BrokerService implements Se
         regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
         regionBroker.setBrokerName(getBrokerName());
         regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
+        regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend());
         if (brokerId != null) {
             regionBroker.setBrokerId(brokerId);
         }
@@ -2412,4 +2414,20 @@ public class BrokerService implements Se
     public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
         this.networkConnectorStartAsync = networkConnectorStartAsync;
     }
+
+    public boolean isAllowTempAutoCreationOnSend() {
+        return allowTempAutoCreationOnSend;
+    }
+
+    /**
+     * enable if temp destinations need to be propagated through a network when
+     * advisorySupport==false. This is used in conjunction with the policy
+     * gcInactiveDestinations for matching temps so they can get removed
+     * when inactive
+     *
+     * @param allowTempAutoCreationOnSend
+     */
+    public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
+        this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1087912&r1=1087911&r2=1087912&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Fri Apr  1 21:56:34 2011
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.util.Collection;
 import javax.jms.ResourceAllocationException;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
@@ -92,7 +93,7 @@ public abstract class BaseDestination im
     private boolean reduceMemoryFootprint = false;
 
     /**
-     * @param broker
+     * @param brokerService
      * @param store
      * @param destination
      * @param parentStats
@@ -241,7 +242,7 @@ public abstract class BaseDestination im
         return store;
     }
 
-    public final boolean isActive() {
+    public boolean isActive() {
         return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount()
!= 0;
     }
 
@@ -674,4 +675,15 @@ public abstract class BaseDestination im
     protected boolean isReduceMemoryFootprint() {
         return this.reduceMemoryFootprint;
     }
+
+   protected boolean hasRegularConsumers(Collection<Subscription> consumers) {
+        boolean hasRegularConsumers = false;
+        for (Subscription subscription: consumers) {
+            if (!subscription.getConsumerInfo().isNetworkSubscription()) {
+                hasRegularConsumers = true;
+                break;
+            }
+        }
+        return hasRegularConsumers;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1087912&r1=1087911&r2=1087912&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Fri Apr  1 21:56:34 2011
@@ -104,7 +104,7 @@ public class RegionBroker extends EmptyB
     private ConnectionContext adminConnectionContext;
     private final Scheduler scheduler;
     private final ThreadPoolExecutor executor;
-    
+    private boolean allowTempAutoCreationOnSend;
     private final Runnable purgeInactiveDestinationsTask = new Runnable() {
         public void run() {
             purgeInactiveDestinations();
@@ -499,7 +499,7 @@ public class RegionBroker extends EmptyB
                 || (producerExchange.getRegion() != null && producerExchange.getRegion().getDestinationMap().get(message.getDestination())
== null)) {
             ActiveMQDestination destination = message.getDestination();
             // ensure the destination is registered with the RegionBroker
-            producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(),
destination,false);
+            producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(),
destination, isAllowTempAutoCreationOnSend());
             Region region;
             switch (destination.getDestinationType()) {
             case ActiveMQDestination.QUEUE_TYPE:
@@ -935,6 +935,10 @@ public class RegionBroker extends EmptyB
         synchronized (purgeInactiveDestinationsTask) {
             List<BaseDestination> list = new ArrayList<BaseDestination>();
             Map<ActiveMQDestination, Destination> map = getDestinationMap();
+            if (isAllowTempAutoCreationOnSend()) {
+                map.putAll(tempQueueRegion.getDestinationMap());
+                map.putAll(tempTopicRegion.getDestinationMap());
+            }
             long timeStamp = System.currentTimeMillis();
             for (Destination d : map.values()) {
                 if (d instanceof BaseDestination) {
@@ -956,7 +960,7 @@ public class RegionBroker extends EmptyB
                             dest.getName() + " Inactive for longer than " + dest.getInactiveTimoutBeforeGC()
                                     + " ms - removing ...");
                     try {
-                        getRoot().removeDestination(context, dest.getActiveMQDestination(),
0);
+                        getRoot().removeDestination(context, dest.getActiveMQDestination(),
isAllowTempAutoCreationOnSend() ? 1 : 0);
                     } catch (Exception e) {
                         LOG.error("Failed to remove inactive destination " + dest, e);
                     }
@@ -964,4 +968,12 @@ public class RegionBroker extends EmptyB
             }
         }
     }
+
+    public boolean isAllowTempAutoCreationOnSend() {
+        return allowTempAutoCreationOnSend;
+    }
+
+    public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
+        this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=1087912&r1=1087911&r2=1087912&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
Fri Apr  1 21:56:34 2011
@@ -90,4 +90,15 @@ public class TempQueue extends Queue{
         }
         super.dispose(context);
     }
+
+    @Override
+    public boolean isActive() {
+        boolean isActive = super.isActive();
+        if (isActive && brokerService.isAllowTempAutoCreationOnSend()) {
+            synchronized (consumers) {
+                isActive = hasRegularConsumers(consumers);
+            }
+        }
+        return isActive;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java?rev=1087912&r1=1087911&r2=1087912&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java
Fri Apr  1 21:56:34 2011
@@ -68,5 +68,15 @@ public class TempTopic  extends Topic  i
     
     public void initialize() {
     }
-   
+
+    @Override
+    public boolean isActive() {
+        boolean isActive = super.isActive();
+        if (isActive && brokerService.isAllowTempAutoCreationOnSend()) {
+            synchronized (consumers) {
+                isActive = hasRegularConsumers(consumers);
+            }
+        }
+        return isActive;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java?rev=1087912&r1=1087911&r2=1087912&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java
Fri Apr  1 21:56:34 2011
@@ -54,7 +54,7 @@ public class NetworkAsyncStartTest exten
         LOG.info("starting B transport connector");
         BrokerService brokerB = brokers.get("BrokerB").broker;
         brokerB.addConnector(brokerBUri);
-        brokerC.start();
+        brokerB.start();
 
         assertTrue("got bridge to B", waitForBridgeFormation(brokerA, 1, 0));
         assertTrue("got bridge to B&C", waitForBridgeFormation(brokerA, 1, 1));

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java?rev=1087912&r1=1087911&r2=1087912&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
Fri Apr  1 21:56:34 2011
@@ -26,6 +26,7 @@ import java.net.URLConnection;
 import java.net.URLStreamHandler;
 import java.net.URLStreamHandlerFactory;
 import java.util.Map;
+import java.util.Vector;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -51,6 +52,7 @@ import org.slf4j.LoggerFactory;
 public class RequestReplyNoAdvisoryNetworkTest extends JmsMultipleBrokersTestSupport {
     private static final transient Logger LOG = LoggerFactory.getLogger(RequestReplyNoAdvisoryNetworkTest.class);
 
+    Vector<BrokerService> brokers = new Vector<BrokerService>();
     BrokerService a, b;
     ActiveMQQueue sendQ = new ActiveMQQueue("sendQ");
     static final String connectionIdMarker = "ID:marker.";
@@ -118,6 +120,8 @@ public class RequestReplyNoAdvisoryNetwo
         });
         a = new XBeanBrokerFactory().createBroker(new URI("xbean:" + localProtocolScheme
+ ":A"));
         b = new XBeanBrokerFactory().createBroker(new URI("xbean:" + localProtocolScheme
+ ":B"));
+        brokers.add(a);
+        brokers.add(b);
 
         doTestNonAdvisoryNetworkRequestReply();
     }
@@ -127,6 +131,25 @@ public class RequestReplyNoAdvisoryNetwo
         doTestNonAdvisoryNetworkRequestReply();
     }
 
+    public void testNonAdvisoryNetworkRequestReplyWithPIM() throws Exception {
+        a = configureBroker("A");
+        b = configureBroker("B");
+        BrokerService hub = configureBroker("M");
+        hub.setAllowTempAutoCreationOnSend(true);
+        configureForPiggyInTheMiddle(bridge(a, hub));
+        configureForPiggyInTheMiddle(bridge(b, hub));
+
+        startBrokers();
+
+        waitForBridgeFormation(hub, 2, 0);
+        doTestNonAdvisoryNetworkRequestReply();
+    }
+
+    private void configureForPiggyInTheMiddle(NetworkConnector bridge) {
+        bridge.setDuplex(true);
+        bridge.setNetworkTTL(2);
+    }
+
     public void doTestNonAdvisoryNetworkRequestReply() throws Exception {
 
         waitForBridgeFormation(a, 1, 0);
@@ -141,6 +164,7 @@ public class RequestReplyNoAdvisoryNetwo
         TextMessage message = sendSession.createTextMessage("1");
         message.setJMSReplyTo(realReplyQ);
         producer.send(message);
+        LOG.info("request sent");
 
         // responder
         ActiveMQConnectionFactory consumerFactory = createConnectionFactory(b);
@@ -149,7 +173,7 @@ public class RequestReplyNoAdvisoryNetwo
         ActiveMQSession consumerSession = (ActiveMQSession)consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = consumerSession.createConsumer(sendQ);
         TextMessage received = (TextMessage) consumer.receive(receiveTimeout);
-        assertNotNull(received);
+        assertNotNull("got request from sender ok", received);
 
         LOG.info("got request, sending reply");
 
@@ -166,16 +190,20 @@ public class RequestReplyNoAdvisoryNetwo
         assertEquals("text is as expected", "got 1", reply.getText());
         sendConnection.close();
 
-        verifyAllTempQueuesAreGone();
-    }
-
-    private void verifyAllTempQueuesAreGone() throws Exception {
-        for (BrokerService brokerService : new BrokerService[]{a, b}) {
-            RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
-            Map temps = regionBroker.getTempTopicRegion().getDestinationMap();
-            assertTrue("no temp topics on " + brokerService + ", " + temps, temps.isEmpty());
-            temps = regionBroker.getTempQueueRegion().getDestinationMap();
-            assertTrue("no temp queues on " + brokerService + ", " + temps, temps.isEmpty());
+        LOG.info("checking for dangling temp destinations");
+        // ensure all temp dests get cleaned up on all brokers
+        for (BrokerService brokerService : brokers) {
+            final RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
+            assertTrue("all temps are gone on " + regionBroker.getBrokerName(), Wait.waitFor(new
Wait.Condition(){
+                @Override
+                public boolean isSatisified() throws Exception {
+                    Map tempTopics = regionBroker.getTempTopicRegion().getDestinationMap();
+                    LOG.info("temp topics on " + regionBroker.getBrokerName() + ", " + tempTopics);
+                    Map tempQ = regionBroker.getTempQueueRegion().getDestinationMap();
+                    LOG.info("temp queues on " + regionBroker.getBrokerName() + ", " + tempQ);
+                    return tempQ.isEmpty() && tempTopics.isEmpty();
+                }
+            }));
         }
     }
 
@@ -199,27 +227,30 @@ public class RequestReplyNoAdvisoryNetwo
         b = configureBroker("B");
         bridge(a, b);
         bridge(b, a);
-        a.start();
-        b.start();
+        startBrokers();
     }
 
-    public void tearDown() throws Exception {
-        stop(a);
-        stop(b);
+    private void startBrokers() throws Exception {
+        for (BrokerService broker: brokers) {
+            broker.start();
+        }
     }
 
-    private void stop(BrokerService broker) throws Exception {
-        if (broker != null) {
+    public void tearDown() throws Exception {
+        for (BrokerService broker: brokers) {
             broker.stop();
         }
+        brokers.clear();
     }
 
-    private void bridge(BrokerService from, BrokerService to) throws Exception {
-        TransportConnector toConnector = to.addConnector("tcp://localhost:0");
+
+    private NetworkConnector bridge(BrokerService from, BrokerService to) throws Exception
{
+        TransportConnector toConnector = to.getTransportConnectors().get(0);
         NetworkConnector bridge =
                 from.addNetworkConnector("static://" + toConnector.getPublishableConnectString());
         bridge.addStaticallyIncludedDestination(sendQ);
         bridge.addStaticallyIncludedDestination(replyQWildcard);
+        return bridge;
     }
 
     private BrokerService configureBroker(String brokerName) throws Exception {
@@ -232,8 +263,13 @@ public class RequestReplyNoAdvisoryNetwo
         PolicyMap map = new PolicyMap();
         PolicyEntry tempReplyQPolicy = new PolicyEntry();
         tempReplyQPolicy.setOptimizedDispatch(true);
+        tempReplyQPolicy.setGcInactiveDestinations(true);
+        tempReplyQPolicy.setInactiveTimoutBeforeGC(10*1000);
         map.put(replyQWildcard, tempReplyQPolicy);
         broker.setDestinationPolicy(map);
+
+        broker.addConnector("tcp://localhost:0");
+        brokers.add(broker);
         return broker;
     }
 }



Mime
View raw message