activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r587188 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/fanout/ test/java/org/apache/activemq/network/ test/java/org/apache/activemq/transport/fanout/
Date Mon, 22 Oct 2007 18:39:58 GMT
Author: chirino
Date: Mon Oct 22 11:39:57 2007
New Revision: 587188

URL: http://svn.apache.org/viewvc?rev=587188&view=rev
Log:
Fixed the test cases in FanoutTransportBrokerTest also added a fanOutQueues property to enabled
fanning out on 
Queues.  See issue: https://issues.apache.org/activemq/browse/AMQ-1464

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=587188&r1=587187&r2=587188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
Mon Oct 22 11:39:57 2007
@@ -76,6 +76,7 @@
     private int maxReconnectAttempts;
     private Exception connectionFailure;
     private FanoutTransportHandler primary;
+    private boolean fanOutQueues;
 
     static class RequestCounter {
 
@@ -210,13 +211,18 @@
                                 primary = fanoutHandler;
                             }
                             t.setTransportListener(fanoutHandler);
-                            connectedCount++;
                             if (started) {
                                 restoreTransport(fanoutHandler);
                             }
+                            connectedCount++;
                         } catch (Exception e) {
                             LOG.debug("Connect fail to: " + uri + ", reason: " + e);
 
+                            if( fanoutHandler.transport !=null ) {
+                                ServiceSupport.dispose(fanoutHandler.transport);
+                                fanoutHandler.transport=null;
+                            }
+                            
                             if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures
>= maxReconnectAttempts) {
                                 LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures
+ " attempt(s)");
                                 connectionFailure = e;
@@ -418,6 +424,9 @@
      */
     private boolean isFanoutCommand(Command command) {
         if (command.isMessage()) {
+            if( fanOutQueues ) {
+                return true;
+            }
             return ((Message)command).getDestination().isTopic();
         }
         if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
@@ -550,6 +559,14 @@
 
     public boolean isFaultTolerant() {
         return true;
+    }
+
+    public boolean isFanOutQueues() {
+        return fanOutQueues;
+    }
+
+    public void setFanOutQueues(boolean fanOutQueues) {
+        this.fanOutQueues = fanOutQueues;
     }
 
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java?rev=587188&r1=587187&r2=587188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java
Mon Oct 22 11:39:57 2007
@@ -151,7 +151,7 @@
         remoteBroker = createRemoteBroker(remotePersistenceAdapter);
         remoteBroker.start();
         String brokerId = remoteBroker.getBrokerName();
-        remoteConnector = new TransportConnector(broker.getBroker(), TransportFactory.bind(brokerId,
new URI(getRemoteURI())));
+        remoteConnector = new TransportConnector(remoteBroker.getBroker(), TransportFactory.bind(brokerId,
new URI(getRemoteURI())));
         remoteConnector.start();
         BrokerRegistry.getInstance().bind("remotehost", remoteBroker);
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java?rev=587188&r1=587187&r2=587188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
Mon Oct 22 11:39:57 2007
@@ -47,8 +47,6 @@
     public ActiveMQDestination destination;
     public int deliveryMode;
 
-    private String remoteURI = "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
-
     public static Test suite() {
         return suite(FanoutTransportBrokerTest.class);
     }
@@ -59,10 +57,10 @@
 
     public void initCombosForTestPublisherFansout() {
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST"), new
ActiveMQTopic("TEST")});
+        addCombinationValues("destination", new Object[] {new ActiveMQTopic("TEST")});
     }
 
-    public void xtestPublisherFansout() throws Exception {
+    public void testPublisherFansout() throws Exception {
 
         // Start a normal consumer on the local broker
         StubConnection connection1 = createConnection();
@@ -105,7 +103,7 @@
 
     public void initCombosForTestPublisherWaitsForServerToBeUp() {
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST"), new
ActiveMQTopic("TEST")});
+        addCombinationValues("destination", new Object[] {new ActiveMQTopic("TEST")});
     }
 
     public void testPublisherWaitsForServerToBeUp() throws Exception {
@@ -177,20 +175,21 @@
 
         // Restart the remote server. State should be re-played and the publish
         // should continue.
-        remoteURI = remoteConnector.getServer().getConnectURI().toString();
+        LOG.info("Restarting Broker");
         restartRemoteBroker();
+        LOG.info("Broker Restarted");
 
         // This should reconnect, and resend
-        assertTrue(publishDone.await(10, TimeUnit.SECONDS));
+        assertTrue(publishDone.await(20, TimeUnit.SECONDS));
 
     }
 
     protected String getLocalURI() {
-        return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
+        return "tcp://localhost:61616";
     }
 
     protected String getRemoteURI() {
-        return remoteURI;
+        return "tcp://localhost:61617";
     }
 
     protected StubConnection createFanoutConnection() throws Exception {



Mime
View raw message