activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1213209 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ test/java/org/apache/activemq/network/
Date Mon, 12 Dec 2011 12:22:12 GMT
Author: gtully
Date: Mon Dec 12 12:22:11 2011
New Revision: 1213209

URL: http://svn.apache.org/viewvc?rev=1213209&view=rev
Log:
resolve intermittent failure, wait for both of the conduit subs to register before test initiation.
additional accessors to make validation possible

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1213209&r1=1213208&r2=1213209&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Mon Dec 12 12:22:11 2011
@@ -1227,6 +1227,10 @@ public abstract class DemandForwardingBr
         return configuration.isDuplex() || createdByDuplex;
     }
 
+    public ConcurrentHashMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap()
{
+        return subscriptionMapByRemoteId;
+    }
+
     public void setBrokerService(BrokerService brokerService) {
         this.brokerService = brokerService;
         this.localBrokerId = brokerService.getRegionBroker().getBrokerId();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=1213209&r1=1213208&r2=1213209&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
Mon Dec 12 12:22:11 2011
@@ -77,6 +77,9 @@ public class DemandSubscription {
         return remoteSubsIds.isEmpty();
     }
 
+    public int size() {
+        return remoteSubsIds.size();
+    }
     /**
      * @return Returns the localInfo.
      */

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=1213209&r1=1213208&r2=1213209&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
Mon Dec 12 12:22:11 2011
@@ -17,6 +17,7 @@
 package org.apache.activemq.network;
 
 import java.net.URI;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -32,6 +33,8 @@ import javax.jms.TopicSession;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.util.Wait;
 import org.apache.activemq.xbean.BrokerFactoryBean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -105,7 +108,9 @@ public class SimpleNetworkTest extends o
         MessageConsumer consumer2 = remoteSession.createConsumer(included);
         MessageProducer producer = localSession.createProducer(included);
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        Thread.sleep(2000);
+
+        waitForConsumerRegistration(localBroker, 2);
+
         for (int i = 0; i < MESSAGE_COUNT; i++) {
             Message test = localSession.createTextMessage("test-" + i);
             producer.send(test);
@@ -117,6 +122,27 @@ public class SimpleNetworkTest extends o
         assertNull(consumer2.receive(1000));
     }
 
+    private void waitForConsumerRegistration(final BrokerService brokerService, final int
min) throws Exception {
+        assertTrue("Internal bridge consumers registered in time", Wait.waitFor(new Wait.Condition()
{
+            @Override
+            public boolean isSatisified() throws Exception {
+                Object[] bridges = brokerService.getNetworkConnectors().get(0).bridges.values().toArray();
+                if (bridges.length > 0) {
+                    LOG.info(brokerService + " bridges "  + bridges);
+                    DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport)
bridges[0];
+                    ConcurrentHashMap<ConsumerId, DemandSubscription> forwardingBridges
= demandForwardingBridgeSupport.getLocalSubscriptionMap();
+                    LOG.info(brokerService + " bridge "  + demandForwardingBridgeSupport
+ ", localSubs: " + forwardingBridges);
+                    if (!forwardingBridges.isEmpty()) {
+                        DemandSubscription demandSubscription = (DemandSubscription) forwardingBridges.values().toArray()[0];
+                        LOG.info(brokerService + " DemandSubscription "  + demandSubscription
+ ", size: " + demandSubscription.size());
+                        return demandSubscription.size() >= min;
+                    }
+                }
+                return false;
+            }
+        }));
+    }
+
     public void testDurableStoreAndForward() throws Exception {
         // create a remote durable consumer
         MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included,
consumerName);



Mime
View raw message