activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1146717 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ test/java/org/apache/activemq/broker/advisory/
Date Thu, 14 Jul 2011 13:55:49 GMT
Author: dejanb
Date: Thu Jul 14 13:55:49 2011
New Revision: 1146717

URL: http://svn.apache.org/viewvc?rev=1146717&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3135 - replay network bridge advisories for new
consumers

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1146717&r1=1146716&r2=1146717&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Thu Jul 14 13:55:49 2011
@@ -52,6 +52,7 @@ public class AdvisoryBroker extends Brok
     protected final ConcurrentHashMap<ConsumerId, ConsumerInfo> consumers = new ConcurrentHashMap<ConsumerId,
ConsumerInfo>();
     protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId,
ProducerInfo>();
     protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations
= new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
+    protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges =
new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
     protected final ProducerId advisoryProducerId = new ProducerId();
     
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
@@ -124,6 +125,15 @@ public class AdvisoryBroker extends Brok
                     fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId());
                 }
             }
+
+            // Replay network bridges
+            if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) {
+                for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator();
iter.hasNext();) {
+                    BrokerInfo key = iter.next();
+                    ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
+                    fireAdvisory(context, topic, key, null, networkBridges.get(key));
+                }
+            }
         }
         return answer;
     }
@@ -399,6 +409,7 @@ public class AdvisoryBroker extends Brok
              advisoryMessage.setBooleanProperty("started", true);
              advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex);
              advisoryMessage.setStringProperty("remoteIp", remoteIp);
+             networkBridges.putIfAbsent(brokerInfo, advisoryMessage);
 
              ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
 
@@ -418,6 +429,7 @@ public class AdvisoryBroker extends Brok
          if (brokerInfo != null) {
              ActiveMQMessage advisoryMessage = new ActiveMQMessage();
              advisoryMessage.setBooleanProperty("started", false);
+             networkBridges.remove(brokerInfo);
 
              ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=1146717&r1=1146716&r2=1146717&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
Thu Jul 14 13:55:49 2011
@@ -452,6 +452,24 @@ public final class AdvisorySupport {
         }
     }
 
+    public static boolean isNetworkBridgeAdvisoryTopic(Destination destination) throws JMSException
{
+        return isNetworkBridgeAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
+    }
+
+    public static boolean isNetworkBridgeAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isNetworkBridgeAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(NETWORK_BRIDGE_TOPIC_PREFIX);
+        }
+    }
+
     /**
      * Returns the agent topic which is used to send commands to the broker
      */

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java?rev=1146717&r1=1146716&r2=1146717&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
Thu Jul 14 13:55:49 2011
@@ -61,6 +61,43 @@ public class AdvisoryNetworkBridgeTest e
         assertNotNull(advisory);
         assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
         assertFalse(advisory.getBooleanProperty("started"));
+
+        conn.close();
+    }
+
+    public void testAddConsumerLater() throws Exception {
+        createBroker1();
+
+        createBroker2();
+
+        Thread.sleep(1000);
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://broker1");
+        Connection conn = factory.createConnection();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        conn.start();
+        MessageConsumer consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());
+
+        ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(2000);
+        assertNotNull(advisory);
+        assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
+        assertTrue(advisory.getBooleanProperty("started"));
+        assertCreatedByDuplex(advisory.getBooleanProperty("createdByDuplex"));
+
+        broker2.stop();
+        broker2.waitUntilStopped();
+
+        advisory = (ActiveMQMessage)consumer.receive(2000);
+        assertNotNull(advisory);
+        assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
+        assertFalse(advisory.getBooleanProperty("started"));
+
+        consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());
+        advisory = (ActiveMQMessage)consumer.receive(1000);
+        assertNull(advisory);
+
+        conn.close();
+
     }
 
     public void assertCreatedByDuplex(boolean createdByDuplex) {



Mime
View raw message