activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1238827 [2/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/jms/ test/java/org/apache/activemq/network/jms/
Date Tue, 31 Jan 2012 21:56:04 GMT
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java?rev=1238827&r1=1238826&r2=1238827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java Tue Jan 31 21:56:03 2012
@@ -17,7 +17,6 @@
 package org.apache.activemq.network.jms;
 
 import javax.jms.Connection;
-import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -30,8 +29,8 @@ import javax.jms.TopicSession;
 
 /**
  * A Destination bridge is used to bridge between to different JMS systems
- * 
- * 
+ *
+ *
  */
 class TopicBridge extends DestinationBridge {
     protected Topic consumerTopic;
@@ -56,6 +55,7 @@ class TopicBridge extends DestinationBri
 
     protected MessageConsumer createConsumer() throws JMSException {
         // set up the consumer
+        if (consumerConnection == null) return null;
         consumerSession = consumerConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
         MessageConsumer consumer = null;
         if (consumerName != null && consumerName.length() > 0) {
@@ -72,20 +72,29 @@ class TopicBridge extends DestinationBri
                 consumer = consumerSession.createSubscriber(consumerTopic);
             }
         }
+
+        consumer.setMessageListener(this);
+
         return consumer;
     }
 
     protected synchronized MessageProducer createProducer() throws JMSException {
+        if (producerConnection == null) return null;
         producerSession = producerConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
         producer = producerSession.createPublisher(null);
         return producer;
     }
 
     protected synchronized void sendMessage(Message message) throws JMSException {
-        if (producer == null) {
-            createProducer();
+        if (producer == null && createProducer() == null) {
+            throw new JMSException("Producer for remote queue not available.");
+        }
+        try {
+            producer.publish(producerTopic, message);
+        } catch (JMSException e) {
+            producer = null;
+            throw e;
         }
-        producer.publish(producerTopic, message);
     }
 
     /**
@@ -100,6 +109,13 @@ class TopicBridge extends DestinationBri
      */
     public void setConsumerConnection(TopicConnection consumerConnection) {
         this.consumerConnection = consumerConnection;
+        if (started.get()) {
+            try {
+                createConsumer();
+            } catch(Exception e) {
+                jmsConnector.handleConnectionFailure(getConnnectionForConsumer());
+            }
+        }
     }
 
     /**

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueBridgeStandaloneReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueBridgeStandaloneReconnectTest.java?rev=1238827&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueBridgeStandaloneReconnectTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueBridgeStandaloneReconnectTest.java Tue Jan 31 21:56:03 2012
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network.jms;
+
+import static org.junit.Assert.*;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+public class QueueBridgeStandaloneReconnectTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(QueueBridgeStandaloneReconnectTest.class);
+
+    private JmsQueueConnector jmsQueueConnector;
+
+    private BrokerService localBroker;
+    private BrokerService foreignBroker;
+
+    private ActiveMQConnectionFactory localConnectionFactory;
+    private ActiveMQConnectionFactory foreignConnectionFactory;
+
+    private Destination outbound;
+    private Destination inbound;
+
+    private ArrayList<Connection> connections = new ArrayList<Connection>();
+
+    @Test
+    public void testSendAndReceiveOverConnectedBridges() throws Exception {
+
+        startLocalBroker();
+        startForeignBroker();
+
+        jmsQueueConnector.start();
+
+        sendMessageToForeignBroker("to.foreign.broker");
+        sendMessageToLocalBroker("to.local.broker");
+
+        final MessageConsumer local = createConsumerForLocalBroker();
+
+        assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = local.receive(100);
+                if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) {
+                    return true;
+                }
+                return false;
+            }
+        }));
+
+        final MessageConsumer foreign = createConsumerForForeignBroker();
+
+        assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = foreign.receive(100);
+                if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) {
+                    return true;
+                }
+                return false;
+            }
+        }));
+    }
+
+    @Test
+    public void testSendAndReceiveOverBridgeWhenStartedBeforeBrokers() throws Exception {
+
+        jmsQueueConnector.start();
+
+        startLocalBroker();
+        startForeignBroker();
+
+        assertTrue("Should have Connected.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return jmsQueueConnector.isConnected();
+            }
+        }));
+
+        sendMessageToForeignBroker("to.foreign.broker");
+        sendMessageToLocalBroker("to.local.broker");
+
+        final MessageConsumer local = createConsumerForLocalBroker();
+
+        assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = local.receive(100);
+                if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) {
+                    return true;
+                }
+                return false;
+            }
+        }));
+
+        final MessageConsumer foreign = createConsumerForForeignBroker();
+
+        assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = foreign.receive(100);
+                if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) {
+                    return true;
+                }
+                return false;
+            }
+        }));
+    }
+
+    @Test
+    public void testSendAndReceiveOverBridgeWithRestart() throws Exception {
+
+        startLocalBroker();
+        startForeignBroker();
+
+        jmsQueueConnector.start();
+
+        assertTrue("Should have Connected.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return jmsQueueConnector.isConnected();
+            }
+        }));
+
+        stopLocalBroker();
+        stopForeignBroker();
+
+        assertTrue("Should have detected connection drop.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return !jmsQueueConnector.isConnected();
+            }
+        }));
+
+        startLocalBroker();
+        startForeignBroker();
+
+        assertTrue("Should have Re-Connected.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return jmsQueueConnector.isConnected();
+            }
+        }));
+
+        sendMessageToForeignBroker("to.foreign.broker");
+        sendMessageToLocalBroker("to.local.broker");
+
+        final MessageConsumer local = createConsumerForLocalBroker();
+
+        assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = local.receive(100);
+                if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) {
+                    return true;
+                }
+                return false;
+            }
+        }));
+
+        final MessageConsumer foreign = createConsumerForForeignBroker();
+
+        assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = foreign.receive(100);
+                if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) {
+                    return true;
+                }
+                return false;
+            }
+        }));
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        localConnectionFactory = createLocalConnectionFactory();
+        foreignConnectionFactory = createForeignConnectionFactory();
+
+        outbound = new ActiveMQQueue("RECONNECT.TEST.OUT.QUEUE");
+        inbound = new ActiveMQQueue("RECONNECT.TEST.IN.QUEUE");
+
+        jmsQueueConnector = new JmsQueueConnector();
+
+        // Wire the bridges.
+        jmsQueueConnector.setOutboundQueueBridges(
+            new OutboundQueueBridge[] {new OutboundQueueBridge("RECONNECT.TEST.OUT.QUEUE")});
+        jmsQueueConnector.setInboundQueueBridges(
+                new InboundQueueBridge[] {new InboundQueueBridge("RECONNECT.TEST.IN.QUEUE")});
+
+        // Tell it how to reach the two brokers.
+        jmsQueueConnector.setOutboundQueueConnectionFactory(
+            new ActiveMQConnectionFactory("tcp://localhost:61617"));
+        jmsQueueConnector.setLocalQueueConnectionFactory(
+                new ActiveMQConnectionFactory("tcp://localhost:61616"));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        disposeConsumerConnections();
+
+        try {
+            jmsQueueConnector.stop();
+            jmsQueueConnector = null;
+        } catch (Exception e) {
+        }
+
+        try {
+            stopLocalBroker();
+        } catch (Throwable e) {
+        }
+        try {
+            stopForeignBroker();
+        } catch (Throwable e) {
+        }
+    }
+
+    protected void disposeConsumerConnections() {
+        for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
+            Connection connection = iter.next();
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            }
+        }
+    }
+
+    protected void startLocalBroker() throws Exception {
+        if (localBroker == null) {
+            localBroker = createFirstBroker();
+            localBroker.start();
+            localBroker.waitUntilStarted();
+        }
+    }
+
+    protected void stopLocalBroker() throws Exception {
+        if (localBroker != null) {
+            localBroker.stop();
+            localBroker.waitUntilStopped();
+            localBroker = null;
+        }
+    }
+
+    protected void startForeignBroker() throws Exception {
+        if (foreignBroker == null) {
+            foreignBroker = createSecondBroker();
+            foreignBroker.start();
+            foreignBroker.waitUntilStarted();
+        }
+    }
+
+    protected void stopForeignBroker() throws Exception {
+        if (foreignBroker != null) {
+            foreignBroker.stop();
+            foreignBroker.waitUntilStopped();
+            foreignBroker = null;
+        }
+    }
+
+    protected BrokerService createFirstBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("broker1");
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:61616");
+
+        return broker;
+    }
+
+    protected BrokerService createSecondBroker() throws Exception {
+
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("broker2");
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:61617");
+
+        return broker;
+    }
+
+    protected ActiveMQConnectionFactory createLocalConnectionFactory() {
+        return new ActiveMQConnectionFactory("tcp://localhost:61616");
+    }
+
+    protected ActiveMQConnectionFactory createForeignConnectionFactory() {
+        return new ActiveMQConnectionFactory("tcp://localhost:61617");
+    }
+
+    protected void sendMessageToForeignBroker(String text) throws JMSException {
+        Connection connection = null;
+        try {
+            connection = localConnectionFactory.createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(outbound);
+            TextMessage message = session.createTextMessage();
+            message.setText(text);
+            producer.send(message);
+        } finally {
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            }
+        }
+    }
+
+    protected void sendMessageToLocalBroker(String text) throws JMSException {
+        Connection connection = null;
+        try {
+            connection = foreignConnectionFactory.createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(inbound);
+            TextMessage message = session.createTextMessage();
+            message.setText(text);
+            producer.send(message);
+        } finally {
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            }
+        }
+    }
+
+    protected MessageConsumer createConsumerForLocalBroker() throws JMSException {
+        Connection connection = localConnectionFactory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        return session.createConsumer(inbound);
+    }
+
+    protected MessageConsumer createConsumerForForeignBroker() throws JMSException {
+        Connection connection = foreignConnectionFactory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        return session.createConsumer(outbound);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueBridgeStandaloneReconnectTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueOutboundBridgeReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueOutboundBridgeReconnectTest.java?rev=1238827&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueOutboundBridgeReconnectTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueOutboundBridgeReconnectTest.java Tue Jan 31 21:56:03 2012
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network.jms;
+
+import static org.junit.Assert.*;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * These test cases are used to verify that queue outbound bridge connections get
+ * re-established in all broker restart scenarios. This is possible when the
+ * outbound bridge is configured using the failover URI with a timeout.
+ */
+public class QueueOutboundBridgeReconnectTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(QueueOutboundBridgeReconnectTest.class);
+
+    private BrokerService producerBroker;
+    private BrokerService consumerBroker;
+    private ActiveMQConnectionFactory producerConnectionFactory;
+    private ActiveMQConnectionFactory consumerConnectionFactory;
+    private Destination destination;
+    private ArrayList<Connection> connections = new ArrayList<Connection>();
+
+    @Test
+    public void testMultipleProducerBrokerRestarts() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            testWithProducerBrokerRestart();
+            disposeConsumerConnections();
+        }
+    }
+
+    @Test
+    public void testRestartProducerWithNoConsumer() throws Exception {
+        stopConsumerBroker();
+
+        startProducerBroker();
+        sendMessage("test123");
+        sendMessage("test456");
+    }
+
+    @Test
+    public void testWithoutRestartsConsumerFirst() throws Exception {
+        startConsumerBroker();
+        startProducerBroker();
+        sendMessage("test123");
+        sendMessage("test456");
+
+        MessageConsumer consumer = createConsumer();
+        Message message = consumer.receive(3000);
+        assertNotNull(message);
+        assertEquals("test123", ((TextMessage)message).getText());
+
+        message = consumer.receive(3000);
+        assertNotNull(message);
+        assertEquals("test456", ((TextMessage)message).getText());
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Test
+    public void testWithoutRestartsProducerFirst() throws Exception {
+        startProducerBroker();
+        sendMessage("test123");
+
+        startConsumerBroker();
+
+        // unless using a failover URI, the first attempt of this send will likely fail,
+        // so increase the timeout below to give the bridge time to recover
+        sendMessage("test456");
+
+        MessageConsumer consumer = createConsumer();
+        Message message = consumer.receive(5000);
+        assertNotNull(message);
+        assertEquals("test123", ((TextMessage) message).getText());
+
+        message = consumer.receive(5000);
+        assertNotNull(message);
+        assertEquals("test456", ((TextMessage) message).getText());
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Test
+    public void testWithProducerBrokerRestart() throws Exception {
+        startProducerBroker();
+        startConsumerBroker();
+
+        sendMessage("test123");
+
+        MessageConsumer consumer = createConsumer();
+        Message message = consumer.receive(5000);
+        assertNotNull(message);
+        assertEquals("test123", ((TextMessage)message).getText());
+        assertNull(consumer.receiveNoWait());
+
+        // Restart the first broker...
+        stopProducerBroker();
+        startProducerBroker();
+
+        sendMessage("test123");
+        message = consumer.receive(5000);
+        assertNotNull(message);
+        assertEquals("test123", ((TextMessage)message).getText());
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Test
+    public void testWithConsumerBrokerRestart() throws Exception {
+
+        startProducerBroker();
+        startConsumerBroker();
+
+        sendMessage("test123");
+
+        final MessageConsumer consumer1 = createConsumer();
+        Message message = consumer1.receive(5000);
+        assertNotNull(message);
+        assertEquals("test123", ((TextMessage)message).getText());
+        assertNull(consumer1.receiveNoWait());
+        consumer1.close();
+
+        // Restart the first broker...
+        stopConsumerBroker();
+        startConsumerBroker();
+
+        // unless using a failover URI, the first attempt of this send will likely fail,
+        // so increase the timeout below to give the bridge time to recover
+        sendMessage("test123");
+
+        final MessageConsumer consumer2 = createConsumer();
+        assertTrue("Expected recover and delivery failed", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = consumer2.receiveNoWait();
+                if (message == null || !((TextMessage)message).getText().equals("test123")) {
+                    return false;
+                }
+                return true;
+            }
+        }));
+        assertNull(consumer2.receiveNoWait());
+    }
+
+    @Test
+    public void testWithConsumerBrokerStartDelay() throws Exception {
+
+        startConsumerBroker();
+        final MessageConsumer consumer = createConsumer();
+
+        TimeUnit.SECONDS.sleep(5);
+
+        startProducerBroker();
+
+        sendMessage("test123");
+        assertTrue("Expected recover and delivery failed", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = consumer.receiveNoWait();
+                if (message == null || !((TextMessage)message).getText().equals("test123")) {
+                    return false;
+                }
+                return true;
+            }
+        }));
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Test
+    public void testWithProducerBrokerStartDelay() throws Exception {
+
+        startProducerBroker();
+
+        TimeUnit.SECONDS.sleep(5);
+
+        startConsumerBroker();
+        MessageConsumer consumer = createConsumer();
+
+        sendMessage("test123");
+        Message message = consumer.receive(5000);
+        assertNotNull(message);
+        assertEquals("test123", ((TextMessage)message).getText());
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        producerConnectionFactory = createProducerConnectionFactory();
+        consumerConnectionFactory = createConsumerConnectionFactory();
+        destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        disposeConsumerConnections();
+        try {
+            stopProducerBroker();
+        } catch (Throwable e) {
+        }
+        try {
+            stopConsumerBroker();
+        } catch (Throwable e) {
+        }
+    }
+
+    protected void disposeConsumerConnections() {
+        for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
+            Connection connection = iter.next();
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            }
+        }
+    }
+
+    protected void startProducerBroker() throws Exception {
+        if (producerBroker == null) {
+            producerBroker = createFirstBroker();
+            producerBroker.start();
+        }
+    }
+
+    protected void stopProducerBroker() throws Exception {
+        if (producerBroker != null) {
+            producerBroker.stop();
+            producerBroker = null;
+        }
+    }
+
+    protected void startConsumerBroker() throws Exception {
+        if (consumerBroker == null) {
+            consumerBroker = createSecondBroker();
+            consumerBroker.start();
+        }
+    }
+
+    protected void stopConsumerBroker() throws Exception {
+        if (consumerBroker != null) {
+            consumerBroker.stop();
+            consumerBroker = null;
+        }
+    }
+
+    protected BrokerService createFirstBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("broker1");
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:61616");
+        broker.addConnector("vm://broker1");
+
+        JmsQueueConnector jmsQueueConnector = new JmsQueueConnector();
+        jmsQueueConnector.setOutboundQueueBridges(
+            new OutboundQueueBridge[] {new OutboundQueueBridge("RECONNECT.TEST.QUEUE")});
+        jmsQueueConnector.setOutboundQueueConnectionFactory(
+            new ActiveMQConnectionFactory("tcp://localhost:61617"));
+
+        broker.setJmsBridgeConnectors(new JmsConnector[]{jmsQueueConnector});
+
+        return broker;
+    }
+
+    protected BrokerService createSecondBroker() throws Exception {
+
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("broker2");
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:61617");
+        broker.addConnector("vm://broker2");
+
+        return broker;
+    }
+
+    protected ActiveMQConnectionFactory createProducerConnectionFactory() {
+        return new ActiveMQConnectionFactory("vm://broker1");
+    }
+
+    protected ActiveMQConnectionFactory createConsumerConnectionFactory() {
+        return new ActiveMQConnectionFactory("vm://broker2");
+    }
+
+    protected void sendMessage(String text) throws JMSException {
+        Connection connection = null;
+        try {
+            connection = producerConnectionFactory.createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(destination);
+            TextMessage message = session.createTextMessage();
+            message.setText(text);
+            producer.send(message);
+        } finally {
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            }
+        }
+    }
+
+    protected MessageConsumer createConsumer() throws JMSException {
+        Connection connection = consumerConnectionFactory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        return session.createConsumer(destination);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueOutboundBridgeReconnectTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicBridgeStandaloneReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicBridgeStandaloneReconnectTest.java?rev=1238827&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicBridgeStandaloneReconnectTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicBridgeStandaloneReconnectTest.java Tue Jan 31 21:56:03 2012
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network.jms;
+
+import static org.junit.Assert.*;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+public class TopicBridgeStandaloneReconnectTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TopicBridgeStandaloneReconnectTest.class);
+
+    private JmsTopicConnector jmsTopicConnector;
+
+    private BrokerService localBroker;
+    private BrokerService foreignBroker;
+
+    private ActiveMQConnectionFactory localConnectionFactory;
+    private ActiveMQConnectionFactory foreignConnectionFactory;
+
+    private Destination outbound;
+    private Destination inbound;
+
+    private ArrayList<Connection> connections = new ArrayList<Connection>();
+
+    @Test
+    public void testSendAndReceiveOverConnectedBridges() throws Exception {
+
+        startLocalBroker();
+        startForeignBroker();
+
+        jmsTopicConnector.start();
+
+        final MessageConsumer local = createConsumerForLocalBroker();
+        final MessageConsumer foreign = createConsumerForForeignBroker();
+
+        sendMessageToForeignBroker("to.foreign.broker");
+        sendMessageToLocalBroker("to.local.broker");
+
+        assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = local.receive(100);
+                if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) {
+                    return true;
+                }
+                return false;
+            }
+        }));
+
+        assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = foreign.receive(100);
+                if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) {
+                    return true;
+                }
+                return false;
+            }
+        }));
+    }
+
+    @Test
+    public void testSendAndReceiveOverBridgeWhenStartedBeforeBrokers() throws Exception {
+
+        jmsTopicConnector.start();
+
+        startLocalBroker();
+        startForeignBroker();
+
+        assertTrue("Should have Connected.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return jmsTopicConnector.isConnected();
+            }
+        }));
+
+        final MessageConsumer local = createConsumerForLocalBroker();
+        final MessageConsumer foreign = createConsumerForForeignBroker();
+
+        sendMessageToForeignBroker("to.foreign.broker");
+        sendMessageToLocalBroker("to.local.broker");
+
+        assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = local.receive(100);
+                if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) {
+                    return true;
+                }
+                return false;
+            }
+        }));
+
+        assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = foreign.receive(100);
+                if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) {
+                    return true;
+                }
+                return false;
+            }
+        }));
+    }
+
+    @Test
+    public void testSendAndReceiveOverBridgeWithRestart() throws Exception {
+
+        startLocalBroker();
+        startForeignBroker();
+
+        jmsTopicConnector.start();
+
+        assertTrue("Should have Connected.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return jmsTopicConnector.isConnected();
+            }
+        }));
+
+        stopLocalBroker();
+        stopForeignBroker();
+
+        assertTrue("Should have detected connection drop.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return !jmsTopicConnector.isConnected();
+            }
+        }));
+
+        startLocalBroker();
+        startForeignBroker();
+
+        assertTrue("Should have Re-Connected.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return jmsTopicConnector.isConnected();
+            }
+        }));
+
+        final MessageConsumer local = createConsumerForLocalBroker();
+        final MessageConsumer foreign = createConsumerForForeignBroker();
+
+        sendMessageToForeignBroker("to.foreign.broker");
+        sendMessageToLocalBroker("to.local.broker");
+
+        assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = local.receive(100);
+                if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) {
+                    return true;
+                }
+                return false;
+            }
+        }));
+
+        assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = foreign.receive(100);
+                if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) {
+                    return true;
+                }
+                return false;
+            }
+        }));
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        localConnectionFactory = createLocalConnectionFactory();
+        foreignConnectionFactory = createForeignConnectionFactory();
+
+        outbound = new ActiveMQTopic("RECONNECT.TEST.OUT.TOPIC");
+        inbound = new ActiveMQTopic("RECONNECT.TEST.IN.TOPIC");
+
+        jmsTopicConnector = new JmsTopicConnector();
+
+        // Wire the bridges.
+        jmsTopicConnector.setOutboundTopicBridges(
+            new OutboundTopicBridge[] {new OutboundTopicBridge("RECONNECT.TEST.OUT.TOPIC")});
+        jmsTopicConnector.setInboundTopicBridges(
+                new InboundTopicBridge[] {new InboundTopicBridge("RECONNECT.TEST.IN.TOPIC")});
+
+        // Tell it how to reach the two brokers.
+        jmsTopicConnector.setOutboundTopicConnectionFactory(
+            new ActiveMQConnectionFactory("tcp://localhost:61617"));
+        jmsTopicConnector.setLocalTopicConnectionFactory(
+                new ActiveMQConnectionFactory("tcp://localhost:61616"));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        disposeConsumerConnections();
+
+        try {
+            jmsTopicConnector.stop();
+            jmsTopicConnector = null;
+        } catch (Exception e) {
+        }
+
+        try {
+            stopLocalBroker();
+        } catch (Throwable e) {
+        }
+        try {
+            stopForeignBroker();
+        } catch (Throwable e) {
+        }
+    }
+
+    protected void disposeConsumerConnections() {
+        for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
+            Connection connection = iter.next();
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            }
+        }
+    }
+
+    protected void startLocalBroker() throws Exception {
+        if (localBroker == null) {
+            localBroker = createFirstBroker();
+            localBroker.start();
+            localBroker.waitUntilStarted();
+        }
+    }
+
+    protected void stopLocalBroker() throws Exception {
+        if (localBroker != null) {
+            localBroker.stop();
+            localBroker.waitUntilStopped();
+            localBroker = null;
+        }
+    }
+
+    protected void startForeignBroker() throws Exception {
+        if (foreignBroker == null) {
+            foreignBroker = createSecondBroker();
+            foreignBroker.start();
+            foreignBroker.waitUntilStarted();
+        }
+    }
+
+    protected void stopForeignBroker() throws Exception {
+        if (foreignBroker != null) {
+            foreignBroker.stop();
+            foreignBroker.waitUntilStopped();
+            foreignBroker = null;
+        }
+    }
+
+    protected BrokerService createFirstBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("broker1");
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:61616");
+
+        return broker;
+    }
+
+    protected BrokerService createSecondBroker() throws Exception {
+
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("broker2");
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:61617");
+
+        return broker;
+    }
+
+    protected ActiveMQConnectionFactory createLocalConnectionFactory() {
+        return new ActiveMQConnectionFactory("tcp://localhost:61616");
+    }
+
+    protected ActiveMQConnectionFactory createForeignConnectionFactory() {
+        return new ActiveMQConnectionFactory("tcp://localhost:61617");
+    }
+
+    protected void sendMessageToForeignBroker(String text) throws JMSException {
+        Connection connection = null;
+        try {
+            connection = localConnectionFactory.createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(outbound);
+            TextMessage message = session.createTextMessage();
+            message.setText(text);
+            producer.send(message);
+        } finally {
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            }
+        }
+    }
+
+    protected void sendMessageToLocalBroker(String text) throws JMSException {
+        Connection connection = null;
+        try {
+            connection = foreignConnectionFactory.createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(inbound);
+            TextMessage message = session.createTextMessage();
+            message.setText(text);
+            producer.send(message);
+        } finally {
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            }
+        }
+    }
+
+    protected MessageConsumer createConsumerForLocalBroker() throws JMSException {
+        Connection connection = localConnectionFactory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        return session.createConsumer(inbound);
+    }
+
+    protected MessageConsumer createConsumerForForeignBroker() throws JMSException {
+        Connection connection = foreignConnectionFactory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        return session.createConsumer(outbound);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicBridgeStandaloneReconnectTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicOutboundBridgeReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicOutboundBridgeReconnectTest.java?rev=1238827&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicOutboundBridgeReconnectTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicOutboundBridgeReconnectTest.java Tue Jan 31 21:56:03 2012
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network.jms;
+
+import static org.junit.Assert.*;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * These test cases are used to verify that queue outbound bridge connections get
+ * re-established in all broker restart scenarios. This is possible when the
+ * outbound bridge is configured using the failover URI with a timeout.
+ */
+public class TopicOutboundBridgeReconnectTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TopicOutboundBridgeReconnectTest.class);
+
+    private BrokerService producerBroker;
+    private BrokerService consumerBroker;
+    private ActiveMQConnectionFactory producerConnectionFactory;
+    private ActiveMQConnectionFactory consumerConnectionFactory;
+    private Destination destination;
+    private ArrayList<Connection> connections = new ArrayList<Connection>();
+
+    @Test
+    public void testMultipleProducerBrokerRestarts() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            testWithProducerBrokerRestart();
+            disposeConsumerConnections();
+        }
+    }
+
+    @Test
+    public void testWithoutRestartsConsumerFirst() throws Exception {
+        startConsumerBroker();
+        startProducerBroker();
+
+        MessageConsumer consumer = createConsumer();
+
+        sendMessage("test123");
+        sendMessage("test456");
+        Message message = consumer.receive(2000);
+        assertNotNull(message);
+        assertEquals("test123", ((TextMessage)message).getText());
+
+        message = consumer.receive(5000);
+        assertNotNull(message);
+        assertEquals("test456", ((TextMessage)message).getText());
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Test
+    public void testWithoutRestartsProducerFirst() throws Exception {
+        startProducerBroker();
+        sendMessage("test123");
+
+        startConsumerBroker();
+
+        // unless using a failover URI, the first attempt of this send will likely fail, so increase the timeout below
+        // to give the bridge time to recover
+        sendMessage("test456");
+
+        MessageConsumer consumer = createConsumer();
+        Message message = consumer.receive(5000);
+        assertNotNull(message);
+        assertEquals("test123", ((TextMessage) message).getText());
+
+        message = consumer.receive(5000);
+        assertNotNull(message);
+        assertEquals("test456", ((TextMessage) message).getText());
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Test
+    public void testWithProducerBrokerRestart() throws Exception {
+        startProducerBroker();
+        startConsumerBroker();
+
+        MessageConsumer consumer = createConsumer();
+
+        sendMessage("test123");
+        Message message = consumer.receive(5000);
+        assertNotNull(message);
+        assertEquals("test123", ((TextMessage)message).getText());
+        assertNull(consumer.receiveNoWait());
+
+        // Restart the first broker...
+        stopProducerBroker();
+        startProducerBroker();
+
+        sendMessage("test123");
+        message = consumer.receive(5000);
+        assertNotNull(message);
+        assertEquals("test123", ((TextMessage)message).getText());
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Test
+    public void testWithConsumerBrokerRestart() throws Exception {
+        startProducerBroker();
+        startConsumerBroker();
+
+        final MessageConsumer consumer1 = createConsumer();
+
+        sendMessage("test123");
+        Message message = consumer1.receive(5000);
+        assertNotNull(message);
+        assertEquals("test123", ((TextMessage)message).getText());
+        assertNull(consumer1.receiveNoWait());
+        consumer1.close();
+
+        // Restart the first broker...
+        stopConsumerBroker();
+        startConsumerBroker();
+
+        // unless using a failover URI, the first attempt of this send will likely fail, so increase the timeout below
+        // to give the bridge time to recover
+        sendMessage("test123");
+
+        final MessageConsumer consumer2 = createConsumer();
+        assertTrue("Expected recover and delivery failed", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = consumer2.receiveNoWait();
+                if (message == null || !((TextMessage)message).getText().equals("test123")) {
+                    return false;
+                }
+                return true;
+            }
+        }));
+        assertNull(consumer2.receiveNoWait());
+    }
+
+    @Test
+    public void testWithConsumerBrokerStartDelay() throws Exception {
+        startConsumerBroker();
+        final MessageConsumer consumer = createConsumer();
+
+        TimeUnit.SECONDS.sleep(5);
+
+        startProducerBroker();
+
+        sendMessage("test123");
+        assertTrue("Expected recover and delivery failed", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = consumer.receiveNoWait();
+                if (message == null || !((TextMessage)message).getText().equals("test123")) {
+                    return false;
+                }
+                return true;
+            }
+        }));
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Test
+    public void testWithProducerBrokerStartDelay() throws Exception {
+        startProducerBroker();
+
+        TimeUnit.SECONDS.sleep(5);
+
+        startConsumerBroker();
+        MessageConsumer consumer = createConsumer();
+
+        sendMessage("test123");
+        Message message = consumer.receive(2000);
+        assertNotNull(message);
+        assertEquals("test123", ((TextMessage)message).getText());
+        assertNull(consumer.receiveNoWait());
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        producerConnectionFactory = createProducerConnectionFactory();
+        consumerConnectionFactory = createConsumerConnectionFactory();
+        destination = new ActiveMQTopic("RECONNECT.TEST.TOPIC");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        disposeConsumerConnections();
+        try {
+            stopProducerBroker();
+        } catch (Throwable e) {
+        }
+        try {
+            stopConsumerBroker();
+        } catch (Throwable e) {
+        }
+    }
+
+    protected void disposeConsumerConnections() {
+        for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
+            Connection connection = iter.next();
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            }
+        }
+    }
+
+    protected void startProducerBroker() throws Exception {
+        if (producerBroker == null) {
+            producerBroker = createFirstBroker();
+            producerBroker.start();
+        }
+    }
+
+    protected void stopProducerBroker() throws Exception {
+        if (producerBroker != null) {
+            producerBroker.stop();
+            producerBroker = null;
+        }
+    }
+
+    protected void startConsumerBroker() throws Exception {
+        if (consumerBroker == null) {
+            consumerBroker = createSecondBroker();
+            consumerBroker.start();
+        }
+    }
+
+    protected void stopConsumerBroker() throws Exception {
+        if (consumerBroker != null) {
+            consumerBroker.stop();
+            consumerBroker = null;
+        }
+    }
+
+    protected BrokerService createFirstBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("broker1");
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:61616");
+        broker.addConnector("vm://broker1");
+
+        JmsTopicConnector jmsTopicConnector = new JmsTopicConnector();
+        jmsTopicConnector.setOutboundTopicBridges(
+            new OutboundTopicBridge[] {new OutboundTopicBridge("RECONNECT.TEST.TOPIC")});
+        jmsTopicConnector.setOutboundTopicConnectionFactory(
+            new ActiveMQConnectionFactory("tcp://localhost:61617"));
+
+        broker.setJmsBridgeConnectors(new JmsConnector[]{jmsTopicConnector});
+
+        return broker;
+    }
+
+    protected BrokerService createSecondBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("broker2");
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:61617");
+        broker.addConnector("vm://broker2");
+
+        return broker;
+    }
+
+    protected ActiveMQConnectionFactory createProducerConnectionFactory() {
+        return new ActiveMQConnectionFactory("vm://broker1");
+    }
+
+    protected ActiveMQConnectionFactory createConsumerConnectionFactory() {
+        return new ActiveMQConnectionFactory("vm://broker2");
+    }
+
+    protected void sendMessage(String text) throws JMSException {
+        Connection connection = null;
+        try {
+            connection = producerConnectionFactory.createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(destination);
+            TextMessage message = session.createTextMessage();
+            message.setText(text);
+            producer.send(message);
+        } finally {
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            }
+        }
+    }
+
+    protected MessageConsumer createConsumer() throws JMSException {
+        Connection connection = consumerConnectionFactory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        return session.createConsumer(destination);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicOutboundBridgeReconnectTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message