activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1238827 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/jms/ test/java/org/apache/activemq/network/jms/
Date Tue, 31 Jan 2012 21:56:04 GMT
Author: tabish
Date: Tue Jan 31 21:56:03 2012
New Revision: 1238827

URL: http://svn.apache.org/viewvc?rev=1238827&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3137
fix for: https://issues.apache.org/jira/browse/AMQ-2455
fix for: https://issues.apache.org/jira/browse/AMQ-3635

Adds reconnect logic and tests along with a policy class to allow for control over the reconnect process.
Reconnection of both local and foreign side of the JmsConnector is supported.

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/ReconnectionPolicy.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueBridgeStandaloneReconnectTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueOutboundBridgeReconnectTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicBridgeStandaloneReconnectTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicOutboundBridgeReconnectTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundQueueBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundTopicBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java?rev=1238827&r1=1238826&r2=1238827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java Tue Jan 31 21:56:03 2012
@@ -24,24 +24,22 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
-import javax.naming.NamingException;
 import org.apache.activemq.Service;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * A Destination bridge is used to bridge between to different JMS systems
- * 
- * 
  */
 public abstract class DestinationBridge implements Service, MessageListener {
+
     private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class);
+
     protected MessageConsumer consumer;
     protected AtomicBoolean started = new AtomicBoolean(false);
     protected JmsMesageConvertor jmsMessageConvertor;
     protected boolean doHandleReplyTo = true;
     protected JmsConnector jmsConnector;
-    private int maximumRetries = 10;
 
     /**
      * @return Returns the consumer.
@@ -78,26 +76,13 @@ public abstract class DestinationBridge 
         this.jmsMessageConvertor = jmsMessageConvertor;
     }
 
-    public int getMaximumRetries() {
-        return maximumRetries;
-    }
-
-    /**
-     * Sets the maximum number of retries if a send fails before closing the
-     * bridge
-     */
-    public void setMaximumRetries(int maximumRetries) {
-        this.maximumRetries = maximumRetries;
-    }
-
     protected Destination processReplyToDestination(Destination destination) {
         return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer());
     }
 
     public void start() throws Exception {
         if (started.compareAndSet(false, true)) {
-            MessageConsumer consumer = createConsumer();
-            consumer.setMessageListener(this);
+            createConsumer();
             createProducer();
         }
     }
@@ -107,37 +92,60 @@ public abstract class DestinationBridge 
     }
 
     public void onMessage(Message message) {
+
         int attempt = 0;
-        while (started.get() && message != null) {
-           
+        final int maxRetries = jmsConnector.getReconnectionPolicy().getMaxSendRetries();
+
+        while (started.get() && message != null && ++attempt <= maxRetries) {
+
             try {
+
                 if (attempt > 0) {
-                    restartProducer();
+                    try {
+                        Thread.sleep(jmsConnector.getReconnectionPolicy().getNextDelay(attempt));
+                    } catch(InterruptedException e) {
+                        break;
+                    }
                 }
+
                 Message converted;
-                if (doHandleReplyTo) {
-                    Destination replyTo = message.getJMSReplyTo();
-                    if (replyTo != null) {
-                        converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
+                if (jmsMessageConvertor != null) {
+                    if (doHandleReplyTo) {
+                        Destination replyTo = message.getJMSReplyTo();
+                        if (replyTo != null) {
+                            converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
+                        } else {
+                            converted = jmsMessageConvertor.convert(message);
+                        }
                     } else {
+                        message.setJMSReplyTo(null);
                         converted = jmsMessageConvertor.convert(message);
                     }
                 } else {
-                    message.setJMSReplyTo(null);
-                    converted = jmsMessageConvertor.convert(message);
+                    // The Producer side is not up or not yet configured, retry.
+                    continue;
+                }
+
+                try {
+                    sendMessage(converted);
+                } catch(Exception e) {
+                    jmsConnector.handleConnectionFailure(getConnectionForProducer());
+                    continue;
                 }
-                sendMessage(converted);
-                message.acknowledge();
+
+                try {
+                    message.acknowledge();
+                } catch(Exception e) {
+                    jmsConnector.handleConnectionFailure(getConnnectionForConsumer());
+                    continue;
+                }
+
+                // if we got here then it made it out and was ack'd
                 return;
+
             } catch (Exception e) {
-                LOG.error("failed to forward message on attempt: " + (++attempt) + " reason: " + e + " message: " + message, e);
-                if (maximumRetries > 0 && attempt >= maximumRetries) {
-                    try {
-                        stop();
-                    } catch (Exception e1) {
-                        LOG.warn("Failed to stop cleanly", e1);
-                    }
-                }
+                LOG.info("failed to forward message on attempt: " + attempt +
+                         " reason: " + e + " message: " + message, e);
             }
         }
     }
@@ -166,15 +174,4 @@ public abstract class DestinationBridge 
 
     protected abstract Connection getConnectionForProducer();
 
-    protected void restartProducer() throws JMSException, NamingException {
-        try {
-            //don't reconnect immediately
-            Thread.sleep(1000);
-            getConnectionForProducer().close();
-        } catch (Exception e) {
-            LOG.debug("Ignoring failure to close producer connection: " + e, e);
-        }
-        jmsConnector.restartProducerConnection();
-        createProducer();
-    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java?rev=1238827&r1=1238826&r2=1238827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java Tue Jan 31 21:56:03 2012
@@ -17,11 +17,12 @@
 package org.apache.activemq.network.jms;
 
 /**
- * Create an Inbound Queue Bridge
- * 
+ * Create an Inbound Queue Bridge.  By default this class uses the sname name for
+ * both the inbound and outbound queue.  This behavior can be overridden however
+ * by using the setter methods to configure both the inbound and outboud queue names
+ * separately.
+ *
  * @org.apache.xbean.XBean
- * 
- * 
  */
 public class InboundQueueBridge extends QueueBridge {
 
@@ -29,8 +30,8 @@ public class InboundQueueBridge extends 
     String localQueueName;
 
     /**
-     * Constructor that takes a foriegn destination as an argument
-     * 
+     * Constructor that takes a foreign destination as an argument
+     *
      * @param inboundQueueName
      */
     public InboundQueueBridge(String inboundQueueName) {
@@ -39,7 +40,7 @@ public class InboundQueueBridge extends 
     }
 
     /**
-     * Default Contructor
+     * Default Constructor
      */
     public InboundQueueBridge() {
     }
@@ -52,6 +53,10 @@ public class InboundQueueBridge extends 
     }
 
     /**
+     * Sets the queue name used for the inbound queue, if the outbound queue
+     * name has not been set, then this method uses the same name to configure
+     * the outbound queue name.
+     *
      * @param inboundQueueName The inboundQueueName to set.
      */
     public void setInboundQueueName(String inboundQueueName) {
@@ -74,5 +79,4 @@ public class InboundQueueBridge extends 
     public void setLocalQueueName(String localQueueName) {
         this.localQueueName = localQueueName;
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java?rev=1238827&r1=1238826&r2=1238827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java Tue Jan 31 21:56:03 2012
@@ -17,11 +17,12 @@
 package org.apache.activemq.network.jms;
 
 /**
- * Create an Inbound Topic Bridge
- * 
+ * Create an Inbound Topic Bridge.  By default this class uses the topic name for
+ * both the inbound and outbound topic.  This behavior can be overridden however
+ * by using the setter methods to configure both the inbound and outboud topic names
+ * separately.
+ *
  * @org.apache.xbean.XBean
- * 
- * 
  */
 public class InboundTopicBridge extends TopicBridge {
 
@@ -29,8 +30,8 @@ public class InboundTopicBridge extends 
     String localTopicName;
 
     /**
-     * Constructor that takes a foriegn destination as an argument
-     * 
+     * Constructor that takes a foreign destination as an argument
+     *
      * @param inboundTopicName
      */
     public InboundTopicBridge(String inboundTopicName) {
@@ -39,7 +40,7 @@ public class InboundTopicBridge extends 
     }
 
     /**
-     * Default Contructor
+     * Default Constructor
      */
     public InboundTopicBridge() {
     }
@@ -52,6 +53,10 @@ public class InboundTopicBridge extends 
     }
 
     /**
+     * Sets the topic name used for the inbound topic, if the outbound topic
+     * name has not been set, then this method uses the same name to configure
+     * the outbound topic name.
+     *
      * @param inboundTopicName
      */
     public void setInboundTopicName(String inboundTopicName) {
@@ -74,5 +79,4 @@ public class InboundTopicBridge extends 
     public void setLocalTopicName(String localTopicName) {
         this.localTopicName = localTopicName;
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java?rev=1238827&r1=1238826&r2=1238827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java Tue Jan 31 21:56:03 2012
@@ -20,12 +20,16 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.naming.NamingException;
+import javax.jms.QueueConnection;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.Service;
@@ -37,22 +41,25 @@ import org.springframework.jndi.JndiTemp
 
 /**
  * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
- * JMS providers are still only 1.0.1 compliant, this bridge itself aimed to be
- * JMS 1.0.2 compliant.
- * 
- * 
+ * JMS providers are still only in compliance with JMS v1.0.1 , this bridge itself
+ * aimed to be in compliance with the JMS 1.0.2 specification.
  */
 public abstract class JmsConnector implements Service {
 
     private static int nextId;
     private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class);
-    
+
     protected JndiTemplate jndiLocalTemplate;
     protected JndiTemplate jndiOutboundTemplate;
     protected JmsMesageConvertor inboundMessageConvertor;
     protected JmsMesageConvertor outboundMessageConvertor;
     protected AtomicBoolean initialized = new AtomicBoolean(false);
+    protected AtomicBoolean localSideInitialized = new AtomicBoolean(false);
+    protected AtomicBoolean foreignSideInitialized = new AtomicBoolean(false);
     protected AtomicBoolean started = new AtomicBoolean(false);
+    protected AtomicBoolean failed = new AtomicBoolean();
+    protected AtomicReference<Connection> foreignConnection = new AtomicReference<Connection>();
+    protected AtomicReference<Connection> localConnection = new AtomicReference<Connection>();
     protected ActiveMQConnectionFactory embeddedConnectionFactory;
     protected int replyToDestinationCacheSize = 10000;
     protected String outboundUsername;
@@ -61,21 +68,22 @@ public abstract class JmsConnector imple
     protected String localPassword;
     protected String outboundClientId;
     protected String localClientId;
-    protected LRUCache replyToBridges = createLRUCache();
+    protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache();
 
+    private ReconnectionPolicy policy = new ReconnectionPolicy();
+    protected ThreadPoolExecutor connectionSerivce;
     private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
     private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
     private String name;
 
-
-    private static LRUCache createLRUCache() {
-        return new LRUCache() {
+    private static LRUCache<Destination, DestinationBridge> createLRUCache() {
+        return new LRUCache<Destination, DestinationBridge>() {
             private static final long serialVersionUID = -7446792754185879286L;
 
-            protected boolean removeEldestEntry(Map.Entry enty) {
+            protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) {
                 if (size() > maxCacheSize) {
-                    Iterator iter = entrySet().iterator();
-                    Map.Entry lru = (Map.Entry)iter.next();
+                    Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator();
+                    Map.Entry<Destination, DestinationBridge> lru = iter.next();
                     remove(lru.getKey());
                     DestinationBridge bridge = (DestinationBridge)lru.getValue();
                     try {
@@ -90,8 +98,6 @@ public abstract class JmsConnector imple
         };
     }
 
-    /**
-     */
     public boolean init() {
         boolean result = initialized.compareAndSet(false, true);
         if (result) {
@@ -108,19 +114,49 @@ public abstract class JmsConnector imple
                 outboundMessageConvertor = new SimpleJmsMessageConvertor();
             }
             replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
+
+            connectionSerivce = createExecutor();
+
+            // Subclasses can override this to customize their own it.
+            result = doConnectorInit();
         }
         return result;
     }
 
+    protected boolean doConnectorInit() {
+
+        // We try to make a connection via a sync call first so that the
+        // JmsConnector is fully initialized before the start call returns
+        // in order to avoid missing any messages that are dispatched
+        // immediately after startup.  If either side fails we queue an
+        // asynchronous task to manage the reconnect attempts.
+
+        try {
+            initializeLocalConnection();
+            localSideInitialized.set(true);
+        } catch(Exception e) {
+            // Queue up the task to attempt the local connection.
+            scheduleAsyncLocalConnectionReconnect();
+        }
+
+        try {
+            initializeForeignConnection();
+            foreignSideInitialized.set(true);
+        } catch(Exception e) {
+            // Queue up the task for the foreign connection now.
+            scheduleAsyncForeignConnectionReconnect();
+        }
+
+        return true;
+    }
+
     public void start() throws Exception {
-        init();
         if (started.compareAndSet(false, true)) {
-            for (int i = 0; i < inboundBridges.size(); i++) {
-                DestinationBridge bridge = inboundBridges.get(i);
+            init();
+            for (DestinationBridge bridge : inboundBridges) {
                 bridge.start();
             }
-            for (int i = 0; i < outboundBridges.size(); i++) {
-                DestinationBridge bridge = outboundBridges.get(i);
+            for (DestinationBridge bridge : outboundBridges) {
                 bridge.start();
             }
             LOG.info("JMS Connector " + getName() + " Started");
@@ -129,21 +165,23 @@ public abstract class JmsConnector imple
 
     public void stop() throws Exception {
         if (started.compareAndSet(true, false)) {
-            for (int i = 0; i < inboundBridges.size(); i++) {
-                DestinationBridge bridge = inboundBridges.get(i);
+
+            this.connectionSerivce.shutdown();
+
+            for (DestinationBridge bridge : inboundBridges) {
                 bridge.stop();
             }
-            for (int i = 0; i < outboundBridges.size(); i++) {
-                DestinationBridge bridge = outboundBridges.get(i);
+            for (DestinationBridge bridge : outboundBridges) {
                 bridge.stop();
             }
             LOG.info("JMS Connector " + getName() + " Stopped");
         }
     }
-    
+
     public void clearBridges() {
         inboundBridges.clear();
         outboundBridges.clear();
+        replyToBridges.clear();
     }
 
     protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
@@ -151,13 +189,21 @@ public abstract class JmsConnector imple
     /**
      * One way to configure the local connection - this is called by The
      * BrokerService when the Connector is embedded
-     * 
+     *
      * @param service
      */
     public void setBrokerService(BrokerService service) {
         embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
     }
 
+    public Connection getLocalConnection() {
+        return this.localConnection.get();
+    }
+
+    public Connection getForeignConnection() {
+        return this.foreignConnection.get();
+    }
+
     /**
      * @return Returns the jndiTemplate.
      */
@@ -222,8 +268,7 @@ public abstract class JmsConnector imple
     }
 
     /**
-     * @param replyToDestinationCacheSize The replyToDestinationCacheSize to
-     *                set.
+     * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set.
      */
     public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
         this.replyToDestinationCacheSize = replyToDestinationCacheSize;
@@ -284,7 +329,7 @@ public abstract class JmsConnector imple
     public void setOutboundUsername(String outboundUsername) {
         this.outboundUsername = outboundUsername;
     }
-    
+
     /**
      * @return the outboundClientId
      */
@@ -312,14 +357,38 @@ public abstract class JmsConnector imple
     public void setLocalClientId(String localClientId) {
         this.localClientId = localClientId;
     }
-    
-    
+
+    /**
+     * @return the currently configured reconnection policy.
+     */
+    public ReconnectionPolicy getReconnectionPolicy() {
+        return this.policy;
+    }
+
+    /**
+     * @param policy The new reconnection policy this {@link JmsConnector} should use.
+     */
+    public void setReconnectionPolicy(ReconnectionPolicy policy) {
+        this.policy = policy;
+    }
+
+    /**
+     * @return returns true if the {@link JmsConnector} is connected to both brokers.
+     */
+    public boolean isConnected() {
+        return localConnection.get() != null && foreignConnection.get() != null;
+    }
+
     protected void addInboundBridge(DestinationBridge bridge) {
-        inboundBridges.add(bridge);
+        if (!inboundBridges.contains(bridge)) {
+            inboundBridges.add(bridge);
+        }
     }
 
     protected void addOutboundBridge(DestinationBridge bridge) {
-        outboundBridges.add(bridge);
+        if (!outboundBridges.contains(bridge)) {
+            outboundBridges.add(bridge);
+        }
     }
 
     protected void removeInboundBridge(DestinationBridge bridge) {
@@ -337,13 +406,205 @@ public abstract class JmsConnector imple
         return name;
     }
 
+    public void setName(String name) {
+        this.name = name;
+    }
+
     private static synchronized int getNextId() {
         return nextId++;
     }
 
-    public void setName(String name) {
-        this.name = name;
+    public boolean isFailed() {
+        return this.failed.get();
     }
 
-    public abstract void restartProducerConnection() throws NamingException, JMSException;
+    /**
+     * Performs the work of connection to the local side of the Connection.
+     * <p>
+     * This creates the initial connection to the local end of the {@link JmsConnector}
+     * and then sets up all the destination bridges with the information needed to bridge
+     * on the local side of the connection.
+     *
+     * @throws Exception if the connection cannot be established for any reason.
+     */
+    protected abstract void initializeLocalConnection() throws Exception;
+
+    /**
+     * Performs the work of connection to the foreign side of the Connection.
+     * <p>
+     * This creates the initial connection to the foreign end of the {@link JmsConnector}
+     * and then sets up all the destination bridges with the information needed to bridge
+     * on the foreign side of the connection.
+     *
+     * @throws Exception if the connection cannot be established for any reason.
+     */
+    protected abstract void initializeForeignConnection() throws Exception;
+
+    /**
+     * Callback method that the Destination bridges can use to report an exception to occurs
+     * during normal bridging operations.
+     *
+     * @param connection
+     * 		The connection that was in use when the failure occured.
+     */
+    void handleConnectionFailure(Connection connection) {
+
+        // Can happen if async exception listener kicks in at the same time.
+        if (connection == null || !this.started.get()) {
+            return;
+        }
+
+        LOG.info("JmsConnector handling loss of connection [" + connection.toString() + "]");
+
+        // TODO - How do we handle the re-wiring of replyToBridges in this case.
+        replyToBridges.clear();
+
+        if (this.foreignConnection.compareAndSet((QueueConnection)connection, null)) {
+
+            // Stop the inbound bridges when the foreign connection is dropped since
+            // the bridge has no consumer and needs to be restarted once a new connection
+            // to the foreign side is made.
+            for (DestinationBridge bridge : inboundBridges) {
+                try {
+                    bridge.stop();
+                } catch(Exception e) {
+                }
+            }
+
+            // We got here first and cleared the connection, now we queue a reconnect.
+            this.connectionSerivce.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        doInitializeConnection(false);
+                    } catch (Exception e) {
+                        LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
+                    }
+                }
+            });
+
+        } else if (this.localConnection.compareAndSet((QueueConnection)connection, null)) {
+
+            // Stop the outbound bridges when the local connection is dropped since
+            // the bridge has no consumer and needs to be restarted once a new connection
+            // to the local side is made.
+            for (DestinationBridge bridge : outboundBridges) {
+                try {
+                    bridge.stop();
+                } catch(Exception e) {
+                }
+            }
+
+            // We got here first and cleared the connection, now we queue a reconnect.
+            this.connectionSerivce.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        doInitializeConnection(true);
+                    } catch (Exception e) {
+                        LOG.error("Failed to initialize local connection for the JMSConnector", e);
+                    }
+                }
+            });
+        }
+    }
+
+    private void scheduleAsyncLocalConnectionReconnect() {
+        this.connectionSerivce.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    doInitializeConnection(true);
+                } catch (Exception e) {
+                    LOG.error("Failed to initialize local connection for the JMSConnector", e);
+                }
+            }
+        });
+    }
+
+    private void scheduleAsyncForeignConnectionReconnect() {
+        this.connectionSerivce.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    doInitializeConnection(false);
+                } catch (Exception e) {
+                    LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
+                }
+            }
+        });
+    }
+
+    private void doInitializeConnection(boolean local) throws Exception {
+
+        int attempt = 0;
+
+        final int maxRetries;
+        if (local) {
+            maxRetries = !localSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
+                                                       policy.getMaxReconnectAttempts();
+        } else {
+            maxRetries = !foreignSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
+                                                         policy.getMaxReconnectAttempts();
+        }
+
+        do
+        {
+            if (attempt > 0) {
+                try {
+                    Thread.sleep(policy.getNextDelay(attempt));
+                } catch(InterruptedException e) {
+                }
+            }
+
+            if (connectionSerivce.isTerminating()) {
+                return;
+            }
+
+            try {
+
+                if (local) {
+                    initializeLocalConnection();
+                    localSideInitialized.set(true);
+                } else {
+                    initializeForeignConnection();
+                    foreignSideInitialized.set(true);
+                }
+
+                // Once we are connected we ensure all the bridges are started.
+                if (localConnection.get() != null && foreignConnection.get() != null) {
+                    for (DestinationBridge bridge : inboundBridges) {
+                        bridge.start();
+                    }
+                    for (DestinationBridge bridge : outboundBridges) {
+                        bridge.start();
+                    }
+                }
+
+                return;
+            } catch(Exception e) {
+                LOG.debug("Failed to establish initial " + (local ? "local" : "foriegn") +
+                          " connection for JmsConnector [" + attempt + "]: " + e.getMessage());
+            }
+        }
+        while (maxRetries < ++attempt && !connectionSerivce.isTerminating());
+
+        this.failed.set(true);
+    }
+
+    private ThreadFactory factory = new ThreadFactory() {
+        public Thread newThread(Runnable runnable) {
+            Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: ");
+            thread.setDaemon(true);
+            return thread;
+        }
+    };
+
+    private ThreadPoolExecutor createExecutor() {
+        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
+        exec.allowCoreThreadTimeOut(true);
+        return exec;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java?rev=1238827&r1=1238826&r2=1238827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java Tue Jan 31 21:56:03 2012
@@ -23,21 +23,35 @@ import javax.jms.Message;
 
 /**
  * Converts Message from one JMS to another
- * 
- * 
  */
 public interface JmsMesageConvertor {
-    
+
     /**
      * Convert a foreign JMS Message to a native ActiveMQ Message
+     *
      * @param message
+     *      The target message to convert to a native ActiveMQ message
+     *
      * @return the converted message
      * @throws JMSException
      */
     Message convert(Message message) throws JMSException;
-    
+
+    /**
+     * Convert a foreign JMS Message to a native ActiveMQ Message - Inbound or
+     * visa-versa outbound.  If the replyTo Destination instance is not null
+     * then the Message is configured with the given replyTo value.
+     *
+     * @param message
+     *      The target message to convert to a native ActiveMQ message
+     * @param replyTo
+     *      The replyTo Destination to set on the converted Message.
+     *
+     * @return the converted message
+     * @throws JMSException
+     */
     Message convert(Message message, Destination replyTo) throws JMSException;
-    
+
     void setConnection(Connection connection);
-   
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java?rev=1238827&r1=1238826&r2=1238827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java Tue Jan 31 21:56:03 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.network.jms;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Queue;
 import javax.jms.QueueConnection;
@@ -31,10 +32,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A Bridge to other JMS Queue providers
- * 
+ *
  * @org.apache.xbean.XBean
- * 
- * 
  */
 public class JmsQueueConnector extends JmsConnector {
     private static final Logger LOG = LoggerFactory.getLogger(JmsQueueConnector.class);
@@ -42,28 +41,9 @@ public class JmsQueueConnector extends J
     private String localConnectionFactoryName;
     private QueueConnectionFactory outboundQueueConnectionFactory;
     private QueueConnectionFactory localQueueConnectionFactory;
-    private QueueConnection outboundQueueConnection;
-    private QueueConnection localQueueConnection;
     private InboundQueueBridge[] inboundQueueBridges;
     private OutboundQueueBridge[] outboundQueueBridges;
 
-    public boolean init() {
-        boolean result = super.init();
-        if (result) {
-            try {
-                initializeForeignQueueConnection();
-                initializeLocalQueueConnection();
-                initializeInboundJmsMessageConvertor();
-                initializeOutboundJmsMessageConvertor();
-                initializeInboundQueueBridges();
-                initializeOutboundQueueBridges();
-            } catch (Exception e) {
-                LOG.error("Failed to initialize the JMSConnector", e);
-            }
-        }
-        return result;
-    }
-
     /**
      * @return Returns the inboundQueueBridges.
      */
@@ -147,28 +127,28 @@ public class JmsQueueConnector extends J
      * @return Returns the localQueueConnection.
      */
     public QueueConnection getLocalQueueConnection() {
-        return localQueueConnection;
+        return (QueueConnection) localConnection.get();
     }
 
     /**
      * @param localQueueConnection The localQueueConnection to set.
      */
     public void setLocalQueueConnection(QueueConnection localQueueConnection) {
-        this.localQueueConnection = localQueueConnection;
+        this.localConnection.set(localQueueConnection);
     }
 
     /**
      * @return Returns the outboundQueueConnection.
      */
     public QueueConnection getOutboundQueueConnection() {
-        return outboundQueueConnection;
+        return (QueueConnection) foreignConnection.get();
     }
 
     /**
      * @param outboundQueueConnection The outboundQueueConnection to set.
      */
     public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
-        this.outboundQueueConnection = foreignQueueConnection;
+        this.foreignConnection.set(foreignQueueConnection);
     }
 
     /**
@@ -179,27 +159,12 @@ public class JmsQueueConnector extends J
         this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
     }
 
-    public void restartProducerConnection() throws NamingException, JMSException {
-        outboundQueueConnection = null;
-        initializeForeignQueueConnection();
+    @Override
+    protected void initializeForeignConnection() throws NamingException, JMSException {
 
-        // the outboundQueueConnection was reestablished - publish the new connection to the bridges
-        if (inboundQueueBridges != null) {
-        	for (int i = 0; i < inboundQueueBridges.length; i++) {
-        		InboundQueueBridge bridge = inboundQueueBridges[i];
-        		bridge.setConsumerConnection(outboundQueueConnection);
-        	}
-        }
-        if (outboundQueueBridges != null) {
-        	for (int i = 0; i < outboundQueueBridges.length; i++) {
-        		OutboundQueueBridge bridge = outboundQueueBridges[i];
-        		bridge.setProducerConnection(outboundQueueConnection);
-        	}
-        }
-    }
+        final QueueConnection newConnection;
 
-    protected void initializeForeignQueueConnection() throws NamingException, JMSException {
-        if (outboundQueueConnection == null) {
+        if (foreignConnection.get() == null) {
             // get the connection factories
             if (outboundQueueConnectionFactory == null) {
                 // look it up from JNDI
@@ -207,31 +172,57 @@ public class JmsQueueConnector extends J
                     outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate
                         .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
                     if (outboundUsername != null) {
-                        outboundQueueConnection = outboundQueueConnectionFactory
+                        newConnection = outboundQueueConnectionFactory
                             .createQueueConnection(outboundUsername, outboundPassword);
                     } else {
-                        outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection();
+                        newConnection = outboundQueueConnectionFactory.createQueueConnection();
                     }
                 } else {
                     throw new JMSException("Cannot create foreignConnection - no information");
                 }
             } else {
                 if (outboundUsername != null) {
-                    outboundQueueConnection = outboundQueueConnectionFactory
+                    newConnection = outboundQueueConnectionFactory
                         .createQueueConnection(outboundUsername, outboundPassword);
                 } else {
-                    outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection();
+                    newConnection = outboundQueueConnectionFactory.createQueueConnection();
                 }
             }
+        } else {
+            // Clear if for now in case something goes wrong during the init.
+            newConnection = (QueueConnection) foreignConnection.getAndSet(null);
         }
-        if (localClientId != null && localClientId.length() > 0) {
-            outboundQueueConnection.setClientID(getOutboundClientId());
+
+        if (outboundClientId != null && outboundClientId.length() > 0) {
+            newConnection.setClientID(getOutboundClientId());
         }
-        outboundQueueConnection.start();
+        newConnection.start();
+
+        outboundMessageConvertor.setConnection(newConnection);
+
+        // Configure the bridges with the new Outbound connection.
+        initializeInboundDestinationBridgesOutboundSide(newConnection);
+        initializeOutboundDestinationBridgesOutboundSide(newConnection);
+
+        // Register for any async error notifications now so we can reset in the
+        // case where there's not a lot of activity and a connection drops.
+        newConnection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                handleConnectionFailure(newConnection);
+            }
+        });
+
+        // At this point all looks good, so this our current connection now.
+        foreignConnection.set(newConnection);
     }
 
-    protected void initializeLocalQueueConnection() throws NamingException, JMSException {
-        if (localQueueConnection == null) {
+    @Override
+    protected void initializeLocalConnection() throws NamingException, JMSException {
+
+        final QueueConnection newConnection;
+
+        if (localConnection.get() == null) {
             // get the connection factories
             if (localQueueConnectionFactory == null) {
                 if (embeddedConnectionFactory == null) {
@@ -240,83 +231,100 @@ public class JmsQueueConnector extends J
                         localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate
                             .lookup(localConnectionFactoryName, QueueConnectionFactory.class);
                         if (localUsername != null) {
-                            localQueueConnection = localQueueConnectionFactory
+                            newConnection = localQueueConnectionFactory
                                 .createQueueConnection(localUsername, localPassword);
                         } else {
-                            localQueueConnection = localQueueConnectionFactory.createQueueConnection();
+                            newConnection = localQueueConnectionFactory.createQueueConnection();
                         }
                     } else {
                         throw new JMSException("Cannot create localConnection - no information");
                     }
                 } else {
-                    localQueueConnection = embeddedConnectionFactory.createQueueConnection();
+                    newConnection = embeddedConnectionFactory.createQueueConnection();
                 }
             } else {
                 if (localUsername != null) {
-                    localQueueConnection = localQueueConnectionFactory.createQueueConnection(localUsername,
-                                                                                             localPassword);
+                    newConnection = localQueueConnectionFactory.
+                            createQueueConnection(localUsername, localPassword);
                 } else {
-                    localQueueConnection = localQueueConnectionFactory.createQueueConnection();
+                    newConnection = localQueueConnectionFactory.createQueueConnection();
                 }
             }
+
+        } else {
+            // Clear if for now in case something goes wrong during the init.
+            newConnection = (QueueConnection) localConnection.getAndSet(null);
         }
+
         if (localClientId != null && localClientId.length() > 0) {
-            localQueueConnection.setClientID(getLocalClientId());
+            newConnection.setClientID(getLocalClientId());
         }
-        localQueueConnection.start();
-    }
+        newConnection.start();
 
-    protected void initializeInboundJmsMessageConvertor() {
-        inboundMessageConvertor.setConnection(localQueueConnection);
-    }
+        inboundMessageConvertor.setConnection(newConnection);
 
-    protected void initializeOutboundJmsMessageConvertor() {
-        outboundMessageConvertor.setConnection(outboundQueueConnection);
+        // Configure the bridges with the new Local connection.
+        initializeInboundDestinationBridgesLocalSide(newConnection);
+        initializeOutboundDestinationBridgesLocalSide(newConnection);
+
+        // Register for any async error notifications now so we can reset in the
+        // case where there's not a lot of activity and a connection drops.
+        newConnection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                handleConnectionFailure(newConnection);
+            }
+        });
+
+        // At this point all looks good, so this our current connection now.
+        localConnection.set(newConnection);
     }
 
-    protected void initializeInboundQueueBridges() throws JMSException {
+    protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
         if (inboundQueueBridges != null) {
-            QueueSession outboundSession = outboundQueueConnection
-                .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-            QueueSession localSession = localQueueConnection.createQueueSession(false,
-                                                                                Session.AUTO_ACKNOWLEDGE);
-            for (int i = 0; i < inboundQueueBridges.length; i++) {
-                InboundQueueBridge bridge = inboundQueueBridges[i];
-                String localQueueName = bridge.getLocalQueueName();
-                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
+            QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            for (InboundQueueBridge bridge : inboundQueueBridges) {
                 String queueName = bridge.getInboundQueueName();
                 Queue foreignQueue = createForeignQueue(outboundSession, queueName);
+                bridge.setConsumer(null);
                 bridge.setConsumerQueue(foreignQueue);
+                bridge.setConsumerConnection(connection);
+                bridge.setJmsConnector(this);
+                addInboundBridge(bridge);
+            }
+            outboundSession.close();
+        }
+    }
+
+    protected void initializeInboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
+        if (inboundQueueBridges != null) {
+            QueueSession localSession = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
+
+            for (InboundQueueBridge bridge : inboundQueueBridges) {
+                String localQueueName = bridge.getLocalQueueName();
+                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
                 bridge.setProducerQueue(activemqQueue);
-                bridge.setProducerConnection(localQueueConnection);
-                bridge.setConsumerConnection(outboundQueueConnection);
+                bridge.setProducerConnection(connection);
                 if (bridge.getJmsMessageConvertor() == null) {
                     bridge.setJmsMessageConvertor(getInboundMessageConvertor());
                 }
                 bridge.setJmsConnector(this);
                 addInboundBridge(bridge);
             }
-            outboundSession.close();
             localSession.close();
         }
     }
 
-    protected void initializeOutboundQueueBridges() throws JMSException {
+    protected void initializeOutboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
         if (outboundQueueBridges != null) {
-            QueueSession outboundSession = outboundQueueConnection
-                .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-            QueueSession localSession = localQueueConnection.createQueueSession(false,
-                                                                                Session.AUTO_ACKNOWLEDGE);
-            for (int i = 0; i < outboundQueueBridges.length; i++) {
-                OutboundQueueBridge bridge = outboundQueueBridges[i];
-                String localQueueName = bridge.getLocalQueueName();
-                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
+            QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            for (OutboundQueueBridge bridge : outboundQueueBridges) {
                 String queueName = bridge.getOutboundQueueName();
                 Queue foreignQueue = createForeignQueue(outboundSession, queueName);
-                bridge.setConsumerQueue(activemqQueue);
                 bridge.setProducerQueue(foreignQueue);
-                bridge.setProducerConnection(outboundQueueConnection);
-                bridge.setConsumerConnection(localQueueConnection);
+                bridge.setProducerConnection(connection);
                 if (bridge.getJmsMessageConvertor() == null) {
                     bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
                 }
@@ -324,6 +332,23 @@ public class JmsQueueConnector extends J
                 addOutboundBridge(bridge);
             }
             outboundSession.close();
+        }
+    }
+
+    protected void initializeOutboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
+        if (outboundQueueBridges != null) {
+            QueueSession localSession =
+                    connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            for (OutboundQueueBridge bridge : outboundQueueBridges) {
+                String localQueueName = bridge.getLocalQueueName();
+                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
+                bridge.setConsumer(null);
+                bridge.setConsumerQueue(activemqQueue);
+                bridge.setConsumerConnection(connection);
+                bridge.setJmsConnector(this);
+                addOutboundBridge(bridge);
+            }
             localSession.close();
         }
     }
@@ -331,7 +356,7 @@ public class JmsQueueConnector extends J
     protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
                                               Connection replyToConsumerConnection) {
         Queue replyToProducerQueue = (Queue)destination;
-        boolean isInbound = replyToProducerConnection.equals(localQueueConnection);
+        boolean isInbound = replyToProducerConnection.equals(localConnection.get());
 
         if (isInbound) {
             InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java?rev=1238827&r1=1238826&r2=1238827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java Tue Jan 31 21:56:03 2012
@@ -18,12 +18,13 @@ package org.apache.activemq.network.jms;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
-import javax.jms.Session;
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicConnectionFactory;
 import javax.jms.TopicSession;
+import javax.jms.Session;
 import javax.naming.NamingException;
 
 import org.slf4j.Logger;
@@ -31,10 +32,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A Bridge to other JMS Topic providers
- * 
+ *
  * @org.apache.xbean.XBean
- * 
- * 
  */
 public class JmsTopicConnector extends JmsConnector {
     private static final Logger LOG = LoggerFactory.getLogger(JmsTopicConnector.class);
@@ -42,28 +41,9 @@ public class JmsTopicConnector extends J
     private String localConnectionFactoryName;
     private TopicConnectionFactory outboundTopicConnectionFactory;
     private TopicConnectionFactory localTopicConnectionFactory;
-    private TopicConnection outboundTopicConnection;
-    private TopicConnection localTopicConnection;
     private InboundTopicBridge[] inboundTopicBridges;
     private OutboundTopicBridge[] outboundTopicBridges;
 
-    public boolean init() {
-        boolean result = super.init();
-        if (result) {
-            try {
-                initializeForeignTopicConnection();
-                initializeLocalTopicConnection();
-                initializeInboundJmsMessageConvertor();
-                initializeOutboundJmsMessageConvertor();
-                initializeInboundTopicBridges();
-                initializeOutboundTopicBridges();
-            } catch (Exception e) {
-                LOG.error("Failed to initialize the JMSConnector", e);
-            }
-        }
-        return result;
-    }
-
     /**
      * @return Returns the inboundTopicBridges.
      */
@@ -100,8 +80,7 @@ public class JmsTopicConnector extends J
     }
 
     /**
-     * @param localTopicConnectionFactory The localTopicConnectionFactory to
-     *                set.
+     * @param localTopicConnectionFactory The localTopicConnectionFactory to set.
      */
     public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) {
         this.localTopicConnectionFactory = localConnectionFactory;
@@ -122,8 +101,7 @@ public class JmsTopicConnector extends J
     }
 
     /**
-     * @param outboundTopicConnectionFactoryName The
-     *                outboundTopicConnectionFactoryName to set.
+     * @param outboundTopicConnectionFactoryName The outboundTopicConnectionFactoryName to set.
      */
     public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
         this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
@@ -147,45 +125,43 @@ public class JmsTopicConnector extends J
      * @return Returns the localTopicConnection.
      */
     public TopicConnection getLocalTopicConnection() {
-        return localTopicConnection;
+        return (TopicConnection) localConnection.get();
     }
 
     /**
      * @param localTopicConnection The localTopicConnection to set.
      */
     public void setLocalTopicConnection(TopicConnection localTopicConnection) {
-        this.localTopicConnection = localTopicConnection;
+        this.localConnection.set(localTopicConnection);
     }
 
     /**
      * @return Returns the outboundTopicConnection.
      */
     public TopicConnection getOutboundTopicConnection() {
-        return outboundTopicConnection;
+        return (TopicConnection) foreignConnection.get();
     }
 
     /**
      * @param outboundTopicConnection The outboundTopicConnection to set.
      */
     public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
-        this.outboundTopicConnection = foreignTopicConnection;
+        this.foreignConnection.set(foreignTopicConnection);
     }
 
     /**
-     * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory
-     *                to set.
+     * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory to set.
      */
     public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
         this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
     }
 
-    public void restartProducerConnection() throws NamingException, JMSException {
-        outboundTopicConnection = null;
-        initializeForeignTopicConnection();
-    }
+    @Override
+    protected void initializeForeignConnection() throws NamingException, JMSException {
 
-    protected void initializeForeignTopicConnection() throws NamingException, JMSException {
-        if (outboundTopicConnection == null) {
+        final TopicConnection newConnection;
+
+        if (foreignConnection.get() == null) {
             // get the connection factories
             if (outboundTopicConnectionFactory == null) {
                 // look it up from JNDI
@@ -193,31 +169,57 @@ public class JmsTopicConnector extends J
                     outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate
                         .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
                     if (outboundUsername != null) {
-                        outboundTopicConnection = outboundTopicConnectionFactory
+                        newConnection = outboundTopicConnectionFactory
                             .createTopicConnection(outboundUsername, outboundPassword);
                     } else {
-                        outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection();
+                        newConnection = outboundTopicConnectionFactory.createTopicConnection();
                     }
                 } else {
-                    throw new JMSException("Cannot create localConnection - no information");
+                    throw new JMSException("Cannot create foreignConnection - no information");
                 }
             } else {
                 if (outboundUsername != null) {
-                    outboundTopicConnection = outboundTopicConnectionFactory
+                    newConnection = outboundTopicConnectionFactory
                         .createTopicConnection(outboundUsername, outboundPassword);
                 } else {
-                    outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection();
+                    newConnection = outboundTopicConnectionFactory.createTopicConnection();
                 }
             }
+        } else {
+            // Clear if for now in case something goes wrong during the init.
+            newConnection = (TopicConnection) foreignConnection.getAndSet(null);
         }
-        if (localClientId != null && localClientId.length() > 0) {
-            outboundTopicConnection.setClientID(getOutboundClientId());
+
+        if (outboundClientId != null && outboundClientId.length() > 0) {
+            newConnection.setClientID(getOutboundClientId());
         }
-        outboundTopicConnection.start();
+        newConnection.start();
+
+        outboundMessageConvertor.setConnection(newConnection);
+
+        // Configure the bridges with the new Outbound connection.
+        initializeInboundDestinationBridgesOutboundSide(newConnection);
+        initializeOutboundDestinationBridgesOutboundSide(newConnection);
+
+        // Register for any async error notifications now so we can reset in the
+        // case where there's not a lot of activity and a connection drops.
+        newConnection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                handleConnectionFailure(newConnection);
+            }
+        });
+
+        // At this point all looks good, so this our current connection now.
+        foreignConnection.set(newConnection);
     }
 
-    protected void initializeLocalTopicConnection() throws NamingException, JMSException {
-        if (localTopicConnection == null) {
+    @Override
+    protected void initializeLocalConnection() throws NamingException, JMSException {
+
+        final TopicConnection newConnection;
+
+        if (localConnection.get() == null) {
             // get the connection factories
             if (localTopicConnectionFactory == null) {
                 if (embeddedConnectionFactory == null) {
@@ -226,83 +228,100 @@ public class JmsTopicConnector extends J
                         localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate
                             .lookup(localConnectionFactoryName, TopicConnectionFactory.class);
                         if (localUsername != null) {
-                            localTopicConnection = localTopicConnectionFactory
+                            newConnection = localTopicConnectionFactory
                                 .createTopicConnection(localUsername, localPassword);
                         } else {
-                            localTopicConnection = localTopicConnectionFactory.createTopicConnection();
+                            newConnection = localTopicConnectionFactory.createTopicConnection();
                         }
                     } else {
                         throw new JMSException("Cannot create localConnection - no information");
                     }
                 } else {
-                    localTopicConnection = embeddedConnectionFactory.createTopicConnection();
+                    newConnection = embeddedConnectionFactory.createTopicConnection();
                 }
             } else {
                 if (localUsername != null) {
-                    localTopicConnection = localTopicConnectionFactory.createTopicConnection(localUsername,
-                                                                                             localPassword);
+                    newConnection = localTopicConnectionFactory.
+                            createTopicConnection(localUsername, localPassword);
                 } else {
-                    localTopicConnection = localTopicConnectionFactory.createTopicConnection();
+                    newConnection = localTopicConnectionFactory.createTopicConnection();
                 }
             }
+
+        } else {
+            // Clear if for now in case something goes wrong during the init.
+            newConnection = (TopicConnection) localConnection.getAndSet(null);
         }
+
         if (localClientId != null && localClientId.length() > 0) {
-            localTopicConnection.setClientID(getLocalClientId());
+            newConnection.setClientID(getLocalClientId());
         }
-        localTopicConnection.start();
-    }
+        newConnection.start();
+
+        inboundMessageConvertor.setConnection(newConnection);
 
-    protected void initializeInboundJmsMessageConvertor() {
-        inboundMessageConvertor.setConnection(localTopicConnection);
+        // Configure the bridges with the new Local connection.
+        initializeInboundDestinationBridgesLocalSide(newConnection);
+        initializeOutboundDestinationBridgesLocalSide(newConnection);
+
+        // Register for any async error notifications now so we can reset in the
+        // case where there's not a lot of activity and a connection drops.
+        newConnection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                handleConnectionFailure(newConnection);
+            }
+        });
+
+        // At this point all looks good, so this our current connection now.
+        localConnection.set(newConnection);
     }
 
-    protected void initializeOutboundJmsMessageConvertor() {
-        outboundMessageConvertor.setConnection(outboundTopicConnection);
+    protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
+        if (inboundTopicBridges != null) {
+            TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            for (InboundTopicBridge bridge : inboundTopicBridges) {
+                String TopicName = bridge.getInboundTopicName();
+                Topic foreignTopic = createForeignTopic(outboundSession, TopicName);
+                bridge.setConsumer(null);
+                bridge.setConsumerTopic(foreignTopic);
+                bridge.setConsumerConnection(connection);
+                bridge.setJmsConnector(this);
+                addInboundBridge(bridge);
+            }
+            outboundSession.close();
+        }
     }
 
-    protected void initializeInboundTopicBridges() throws JMSException {
+    protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
         if (inboundTopicBridges != null) {
-            TopicSession outboundSession = outboundTopicConnection
-                .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-            TopicSession localSession = localTopicConnection.createTopicSession(false,
-                                                                                Session.AUTO_ACKNOWLEDGE);
-            for (int i = 0; i < inboundTopicBridges.length; i++) {
-                InboundTopicBridge bridge = inboundTopicBridges[i];
+            TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
+
+            for (InboundTopicBridge bridge : inboundTopicBridges) {
                 String localTopicName = bridge.getLocalTopicName();
                 Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
-                String topicName = bridge.getInboundTopicName();
-                Topic foreignTopic = createForeignTopic(outboundSession, topicName);
-                bridge.setConsumerTopic(foreignTopic);
                 bridge.setProducerTopic(activemqTopic);
-                bridge.setProducerConnection(localTopicConnection);
-                bridge.setConsumerConnection(outboundTopicConnection);
+                bridge.setProducerConnection(connection);
                 if (bridge.getJmsMessageConvertor() == null) {
                     bridge.setJmsMessageConvertor(getInboundMessageConvertor());
                 }
                 bridge.setJmsConnector(this);
                 addInboundBridge(bridge);
             }
-            outboundSession.close();
             localSession.close();
         }
     }
 
-    protected void initializeOutboundTopicBridges() throws JMSException {
+    protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
         if (outboundTopicBridges != null) {
-            TopicSession outboundSession = outboundTopicConnection
-                .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-            TopicSession localSession = localTopicConnection.createTopicSession(false,
-                                                                                Session.AUTO_ACKNOWLEDGE);
-            for (int i = 0; i < outboundTopicBridges.length; i++) {
-                OutboundTopicBridge bridge = outboundTopicBridges[i];
-                String localTopicName = bridge.getLocalTopicName();
-                Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
+            TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            for (OutboundTopicBridge bridge : outboundTopicBridges) {
                 String topicName = bridge.getOutboundTopicName();
                 Topic foreignTopic = createForeignTopic(outboundSession, topicName);
-                bridge.setConsumerTopic(activemqTopic);
                 bridge.setProducerTopic(foreignTopic);
-                bridge.setProducerConnection(outboundTopicConnection);
-                bridge.setConsumerConnection(localTopicConnection);
+                bridge.setProducerConnection(connection);
                 if (bridge.getJmsMessageConvertor() == null) {
                     bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
                 }
@@ -310,6 +329,23 @@ public class JmsTopicConnector extends J
                 addOutboundBridge(bridge);
             }
             outboundSession.close();
+        }
+    }
+
+    protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
+        if (outboundTopicBridges != null) {
+            TopicSession localSession =
+                    connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            for (OutboundTopicBridge bridge : outboundTopicBridges) {
+                String localTopicName = bridge.getLocalTopicName();
+                Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
+                bridge.setConsumer(null);
+                bridge.setConsumerTopic(activemqTopic);
+                bridge.setConsumerConnection(connection);
+                bridge.setJmsConnector(this);
+                addOutboundBridge(bridge);
+            }
             localSession.close();
         }
     }
@@ -317,7 +353,7 @@ public class JmsTopicConnector extends J
     protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
                                               Connection replyToConsumerConnection) {
         Topic replyToProducerTopic = (Topic)destination;
-        boolean isInbound = replyToProducerConnection.equals(localTopicConnection);
+        boolean isInbound = replyToProducerConnection.equals(localConnection.get());
 
         if (isInbound) {
             InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundQueueBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundQueueBridge.java?rev=1238827&r1=1238826&r2=1238827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundQueueBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundQueueBridge.java Tue Jan 31 21:56:03 2012
@@ -17,11 +17,12 @@
 package org.apache.activemq.network.jms;
 
 /**
- * Create an Outbound Queue Bridge
- * 
+ * Create an Outbound Queue Bridge.  By default the bridge uses the same
+ * name for both the inbound and outbound queues, however this can be altered
+ * by using the public setter methods to configure both inbound and outbound
+ * queue names.
+ *
  * @org.apache.xbean.XBean
- * 
- * 
  */
 public class OutboundQueueBridge extends QueueBridge {
 
@@ -30,7 +31,7 @@ public class OutboundQueueBridge extends
 
     /**
      * Constructor that takes a foreign destination as an argument
-     * 
+     *
      * @param outboundQueueName
      */
     public OutboundQueueBridge(String outboundQueueName) {
@@ -39,7 +40,7 @@ public class OutboundQueueBridge extends
     }
 
     /**
-     * Default Contructor
+     * Default Constructor
      */
     public OutboundQueueBridge() {
     }
@@ -52,6 +53,10 @@ public class OutboundQueueBridge extends
     }
 
     /**
+     * Sets the name of the outbound queue name.  If the inbound queue name
+     * has not been set already then this method uses the provided queue name
+     * to set the inbound topic name as well.
+     *
      * @param outboundQueueName The outboundQueueName to set.
      */
     public void setOutboundQueueName(String outboundQueueName) {
@@ -74,5 +79,4 @@ public class OutboundQueueBridge extends
     public void setLocalQueueName(String localQueueName) {
         this.localQueueName = localQueueName;
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundTopicBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundTopicBridge.java?rev=1238827&r1=1238826&r2=1238827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundTopicBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundTopicBridge.java Tue Jan 31 21:56:03 2012
@@ -17,11 +17,12 @@
 package org.apache.activemq.network.jms;
 
 /**
- * Create an Outbound Topic Bridge
- * 
+ * Create an Outbound Topic Bridge.  By default the bridge uses the same
+ * name for both the inbound and outbound topics, however this can be altered
+ * by using the public setter methods to configure both inbound and outbound
+ * topic names.
+ *
  * @org.apache.xbean.XBean
- * 
- * 
  */
 public class OutboundTopicBridge extends TopicBridge {
 
@@ -30,7 +31,7 @@ public class OutboundTopicBridge extends
 
     /**
      * Constructor that takes a foreign destination as an argument
-     * 
+     *
      * @param outboundTopicName
      */
     public OutboundTopicBridge(String outboundTopicName) {
@@ -52,6 +53,10 @@ public class OutboundTopicBridge extends
     }
 
     /**
+     * Sets the name of the outbound topic name.  If the inbound topic name
+     * has not been set already then this method uses the provided topic name
+     * to set the inbound topic name as well.
+     *
      * @param outboundTopicName The outboundTopicName to set.
      */
     public void setOutboundTopicName(String outboundTopicName) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java?rev=1238827&r1=1238826&r2=1238827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java Tue Jan 31 21:56:03 2012
@@ -28,9 +28,7 @@ import javax.jms.QueueSession;
 import javax.jms.Session;
 
 /**
- * A Destination bridge is used to bridge between to different JMS systems
- * 
- * 
+ * A Destination bridge is used to bridge Queues between to different JMS systems
  */
 class QueueBridge extends DestinationBridge {
     protected Queue consumerQueue;
@@ -55,6 +53,7 @@ class QueueBridge extends DestinationBri
 
     protected MessageConsumer createConsumer() throws JMSException {
         // set up the consumer
+        if (consumerConnection == null) return null;
         consumerSession = consumerConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
         MessageConsumer consumer = null;
 
@@ -64,20 +63,28 @@ class QueueBridge extends DestinationBri
             consumer = consumerSession.createReceiver(consumerQueue);
         }
 
+        consumer.setMessageListener(this);
+
         return consumer;
     }
 
     protected synchronized MessageProducer createProducer() throws JMSException {
+        if (producerConnection == null) return null;
         producerSession = producerConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
         producer = producerSession.createSender(null);
         return producer;
     }
 
     protected synchronized void sendMessage(Message message) throws JMSException {
-        if (producer == null) {
-            createProducer();
+        if (producer == null && createProducer() == null) {
+            throw new JMSException("Producer for remote queue not available.");
+        }
+        try {
+            producer.send(producerQueue, message);
+        } catch (JMSException e) {
+            producer = null;
+            throw e;
         }
-        producer.send(producerQueue, message);
     }
 
     /**
@@ -92,6 +99,13 @@ class QueueBridge extends DestinationBri
      */
     public void setConsumerConnection(QueueConnection consumerConnection) {
         this.consumerConnection = consumerConnection;
+        if (started.get()) {
+            try {
+                createConsumer();
+            } catch(Exception e) {
+                jmsConnector.handleConnectionFailure(getConnnectionForConsumer());
+            }
+        }
     }
 
     /**
@@ -157,5 +171,4 @@ class QueueBridge extends DestinationBri
     protected Connection getConnectionForProducer() {
         return getProducerConnection();
     }
-
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/ReconnectionPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/ReconnectionPolicy.java?rev=1238827&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/ReconnectionPolicy.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/ReconnectionPolicy.java Tue Jan 31 21:56:03 2012
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.network.jms;
+
+/**
+ * A policy object that defines how a {@link JmsConnector} deals with
+ * reconnection of the local and foreign connections.
+ *
+ * @org.apache.xbean.XBean element="reconnectionPolicy"
+ */
+public class ReconnectionPolicy {
+
+    private int maxSendRetries = 10;
+    private long sendRetryDelay = 1000L;
+
+    private int maxReconnectAttempts = -1;
+    private int maxInitialConnectAttempts = -1;
+    private long maximumReconnectDelay = 30000;
+    private long initialReconnectDelay = 1000L;
+    private boolean useExponentialBackOff = false;
+    private double backOffMultiplier = 2.0;
+
+    /**
+     * Gets the maximum number of a times a Message send should be retried before
+     * a JMSExeception is thrown indicating that the operation failed.
+     *
+     * @return number of send retries that will be performed.
+     */
+    public int getMaxSendRetries() {
+        return maxSendRetries;
+    }
+
+    /**
+     * Sets the maximum number of a times a Message send should be retried before
+     * a JMSExeception is thrown indicating that the operation failed.
+     *
+     * @param maxRetries
+     * 			number of send retries that will be performed.
+     */
+    public void setMaxSendRetries(int maxSendRetries) {
+        this.maxSendRetries = maxSendRetries;
+    }
+
+    /**
+     * Get the amount of time the DestionationBridge will wait between attempts
+     * to forward a message.
+     *
+     * @return time in milliseconds to wait between send attempts.
+     */
+    public long getSendRetryDelay() {
+        return this.sendRetryDelay;
+    }
+
+    /**
+     * Set the amount of time the DestionationBridge will wait between attempts
+     * to forward a message.  The default policy limits the minimum time between
+     * send attempt to one second.
+     *
+     * @param sendRetryDelay
+     * 		Time in milliseconds to wait before attempting another send.
+     */
+    public void setSendRetyDelay(long sendRetryDelay) {
+        if (sendRetryDelay < 1000L) {
+            this.sendRetryDelay = 1000L;
+        }
+
+        this.sendRetryDelay = sendRetryDelay;
+    }
+
+    /**
+     * Gets the number of time that {@link JmsConnector} will attempt to connect
+     * or reconnect before giving up.  By default the policy sets this value to
+     * a negative value meaning try forever.
+     *
+     * @return the number of attempts to connect before giving up.
+     */
+    public int getMaxReconnectAttempts() {
+        return maxReconnectAttempts;
+    }
+
+    /**
+     * Sets the number of time that {@link JmsConnector} will attempt to connect
+     * or reconnect before giving up.  By default the policy sets this value to
+     * a negative value meaning try forever, set to a positive value to retry a
+     * fixed number of times, or zero to never try and reconnect.
+     *
+     * @param maxReconnectAttempts
+     */
+    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
+        this.maxReconnectAttempts = maxReconnectAttempts;
+    }
+
+    /**
+     * Gets the maximum number of times that the {@link JmsConnector} will try
+     * to connect on startup to before it marks itself as failed and does not
+     * try any further connections.
+     *
+     * @returns the max number of times a connection attempt is made before failing.
+     */
+    public int getMaxInitialConnectAttempts() {
+        return this.maxInitialConnectAttempts;
+    }
+
+    /**
+     * Sets the maximum number of times that the {@link JmsConnector} will try
+     * to connect on startup to before it marks itself as failed and does not
+     * try any further connections.
+     *
+     * @param maxAttempts
+     * 		The max number of times a connection attempt is made before failing.
+     */
+    public void setMaxInitialConnectAttempts(int maxAttempts) {
+        this.maxInitialConnectAttempts = maxAttempts;
+    }
+
+    /**
+     * Gets the maximum delay that is inserted between each attempt to connect
+     * before another attempt is made.  The default setting for this value is
+     * 30 seconds.
+     *
+     * @return the max delay between connection attempts in milliseconds.
+     */
+    public long getMaximumReconnectDelay() {
+        return maximumReconnectDelay;
+    }
+
+    /**
+     * Sets the maximum delay that is inserted between each attempt to connect
+     * before another attempt is made.
+     *
+     * @param maximumReconnectDelay
+     * 		The maximum delay between connection attempts in milliseconds.
+     */
+    public void setMaximumReconnectDelay(long maximumReconnectDelay) {
+        this.maximumReconnectDelay = maximumReconnectDelay;
+    }
+
+    /**
+     * Gets the initial delay value used before a reconnection attempt is made.  If the
+     * use exponential back-off value is set to false then this will be the fixed time
+     * between connection attempts.  By default this value is set to one second.
+     *
+     * @return time in milliseconds that will be used between connection retries.
+     */
+    public long getInitialReconnectDelay() {
+        return initialReconnectDelay;
+    }
+
+    /**
+     * Gets the initial delay value used before a reconnection attempt is made.  If the
+     * use exponential back-off value is set to false then this will be the fixed time
+     * between connection attempts.  By default this value is set to one second.
+
+     * @param initialReconnectDelay
+     * 		Time in milliseconds to wait before the first reconnection attempt.
+     */
+    public void setInitialReconnectDelay(long initialReconnectDelay) {
+        this.initialReconnectDelay = initialReconnectDelay;
+    }
+
+    /**
+     * Gets whether the policy uses the set back-off multiplier to grow the time between
+     * connection attempts.
+     *
+     * @return true if the policy will grow the time between connection attempts.
+     */
+    public boolean isUseExponentialBackOff() {
+        return useExponentialBackOff;
+    }
+
+    /**
+     * Sets whether the policy uses the set back-off multiplier to grow the time between
+     * connection attempts.
+     *
+     * @param useExponentialBackOff
+     */
+    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
+        this.useExponentialBackOff = useExponentialBackOff;
+    }
+
+    /**
+     * Gets the multiplier used to grow the delay between connection attempts from the initial
+     * time to the max set time.  By default this value is set to 2.0.
+     *
+     * @return the currently configured connection delay multiplier.
+     */
+    public double getBackOffMultiplier() {
+        return backOffMultiplier;
+    }
+
+    /**
+     * Gets the multiplier used to grow the delay between connection attempts from the initial
+     * time to the max set time.  By default this value is set to 2.0.
+     *
+     * @param backOffMultiplier
+     * 		The multiplier value used to grow the reconnection delay.
+     */
+    public void setBackOffMultiplier(double backOffMultiplier) {
+        this.backOffMultiplier = backOffMultiplier;
+    }
+
+    /**
+     * Returns the next computed delay value that the connection controller should use to
+     * wait before attempting another connection for the {@link JmsConnector}.
+     *
+     * @param attempt
+     * 		The current connection attempt.
+     *
+     * @return the next delay amount in milliseconds.
+     */
+    public long getNextDelay(int attempt) {
+
+        if (attempt == 0) {
+            return 0;
+        }
+
+        long nextDelay = initialReconnectDelay;
+
+        if (useExponentialBackOff) {
+            nextDelay = nextDelay * (long)(attempt * backOffMultiplier);
+        }
+
+        if (maximumReconnectDelay > 0 && nextDelay > maximumReconnectDelay) {
+            nextDelay = maximumReconnectDelay;
+        }
+
+        return nextDelay;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/ReconnectionPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java?rev=1238827&r1=1238826&r2=1238827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java Tue Jan 31 21:56:03 2012
@@ -23,18 +23,17 @@ import javax.jms.Message;
 
 /**
  * Converts Message from one JMS to another
- * 
+ *
  * @org.apache.xbean.XBean
- * 
- * 
  */
 public class SimpleJmsMessageConvertor implements JmsMesageConvertor {
 
     /**
      * Convert a foreign JMS Message to a native ActiveMQ Message - Inbound or
-     * visa-versa outbound
-     * 
+     * visa-versa outbound.
+     *
      * @param message
+     *      The target message to convert to a native ActiveMQ message
      * @return the converted message
      * @throws JMSException
      */
@@ -42,6 +41,19 @@ public class SimpleJmsMessageConvertor i
         return message;
     }
 
+    /**
+     * Convert a foreign JMS Message to a native ActiveMQ Message - Inbound or
+     * visa-versa outbound.  If the replyTo Destination instance is not null
+     * then the Message is configured with the given replyTo value.
+     *
+     * @param message
+     *      The target message to convert to a native ActiveMQ message
+     * @param replyTo
+     *      The replyTo Destination to set on the converted Message.
+     *
+     * @return the converted message
+     * @throws JMSException
+     */
     public Message convert(Message message, Destination replyTo) throws JMSException {
         Message msg = convert(message);
         if (replyTo != null) {
@@ -55,5 +67,4 @@ public class SimpleJmsMessageConvertor i
     public void setConnection(Connection connection) {
         // do nothing
     }
-
 }



Mime
View raw message