activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r436752 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms: DestinationBridge.java JmsConnector.java JmsQueueConnector.java JmsTopicConnector.java QueueBridge.java TopicBridge.java
Date Fri, 25 Aug 2006 11:01:33 GMT
Author: jstrachan
Date: Fri Aug 25 04:01:30 2006
New Revision: 436752

URL: http://svn.apache.org/viewvc?rev=436752&view=rev
Log:
attempt to reconnect to the remote JMS broker if we get a fail sending a message to it - an
attempt at fixing AMQ-895

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java?rev=436752&r1=436751&r2=436752&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
Fri Aug 25 04:01:30 2006
@@ -17,6 +17,12 @@
  */
 package org.apache.activemq.network.jms;
 
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -24,29 +30,26 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
+import javax.naming.NamingException;
 
-import org.apache.activemq.Service;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 /**
  * A Destination bridge is used to bridge between to different JMS systems
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public abstract class DestinationBridge implements Service,MessageListener{
-    private static final Log log=LogFactory.getLog(DestinationBridge.class);
+public abstract class DestinationBridge implements Service, MessageListener {
+    private static final Log log = LogFactory.getLog(DestinationBridge.class);
     protected MessageConsumer consumer;
-    protected AtomicBoolean started=new AtomicBoolean(false);
+    protected AtomicBoolean started = new AtomicBoolean(false);
     protected JmsMesageConvertor jmsMessageConvertor;
     protected boolean doHandleReplyTo = true;
     protected JmsConnector jmsConnector;
+    private int maximumRetries = 10;
 
     /**
      * @return Returns the consumer.
      */
-    public MessageConsumer getConsumer(){
+    public MessageConsumer getConsumer() {
         return consumer;
     }
 
@@ -54,88 +57,110 @@
      * @param consumer
      *            The consumer to set.
      */
-    public void setConsumer(MessageConsumer consumer){
-        this.consumer=consumer;
+    public void setConsumer(MessageConsumer consumer) {
+        this.consumer = consumer;
     }
 
     /**
      * @param connector
      */
-    public void setJmsConnector(JmsConnector connector){
+    public void setJmsConnector(JmsConnector connector) {
         this.jmsConnector = connector;
     }
+
     /**
      * @return Returns the inboundMessageConvertor.
      */
-    public JmsMesageConvertor getJmsMessageConvertor(){
+    public JmsMesageConvertor getJmsMessageConvertor() {
         return jmsMessageConvertor;
     }
 
     /**
-     * @param jmsMessageConvertor 
+     * @param jmsMessageConvertor
+     */
+    public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
+        this.jmsMessageConvertor = jmsMessageConvertor;
+    }
+
+    public int getMaximumRetries() {
+        return maximumRetries;
+    }
+
+    /**
+     * Sets the maximum number of retries if a send fails before closing the
+     * bridge
      */
-    public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor){
-        this.jmsMessageConvertor=jmsMessageConvertor;
+    public void setMaximumRetries(int maximumRetries) {
+        this.maximumRetries = maximumRetries;
     }
 
-   
-    protected Destination processReplyToDestination (Destination destination){
+    protected Destination processReplyToDestination(Destination destination) {
         return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(),
getConnectionForProducer());
     }
-    
-    public void start() throws Exception{
-        if(started.compareAndSet(false,true)){
-            MessageConsumer consumer=createConsumer();
+
+    public void start() throws Exception {
+        if (started.compareAndSet(false, true)) {
+            MessageConsumer consumer = createConsumer();
             consumer.setMessageListener(this);
             createProducer();
         }
     }
 
-    public void stop() throws Exception{
+    public void stop() throws Exception {
         started.set(false);
     }
-    
-    public void onMessage(Message message){
-    	if(started.get()&&message!=null){
-    		try{
-    			Message converted;
-    			if(doHandleReplyTo){
-    				Destination replyTo = message.getJMSReplyTo();
-    				if(replyTo != null){
-    					converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
-    				} else {
-    					converted = jmsMessageConvertor.convert(message);
-    				}
-    			} else {
-    				message.setJMSReplyTo(null);
-    				converted = jmsMessageConvertor.convert(message);
-    			}				
-    			sendMessage(converted);
-    			message.acknowledge();
-    		}catch(JMSException e){
-    			log.error("failed to forward message: "+message,e);
-    			try{
-    				stop();
-    			}catch(Exception e1){
-    				log.warn("Failed to stop cleanly",e1);
-    			}
-    		}
-    	}
+
+    public void onMessage(Message message) {
+        if (started.get() && message != null) {
+            int attempt = 0;
+            try {
+                if (attempt > 0) {
+                    restartProducer();
+                }
+                Message converted;
+                if (doHandleReplyTo) {
+                    Destination replyTo = message.getJMSReplyTo();
+                    if (replyTo != null) {
+                        converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
+                    }
+                    else {
+                        converted = jmsMessageConvertor.convert(message);
+                    }
+                }
+                else {
+                    message.setJMSReplyTo(null);
+                    converted = jmsMessageConvertor.convert(message);
+                }
+                sendMessage(converted);
+                message.acknowledge();
+            }
+            catch (Exception e) {
+                log.error("failed to forward message on attempt: " + (++attempt) + " reason:
" + e + " message: " + message, e);
+                if (maximumRetries > 0 && attempt >= maximumRetries) {
+                    try {
+                        stop();
+                    }
+                    catch (Exception e1) {
+                        log.warn("Failed to stop cleanly", e1);
+                    }
+                }
+            }
+        }
     }
 
-    
     /**
      * @return Returns the doHandleReplyTo.
      */
-    protected boolean isDoHandleReplyTo(){
+    protected boolean isDoHandleReplyTo() {
         return doHandleReplyTo;
     }
 
     /**
-     * @param doHandleReplyTo The doHandleReplyTo to set.
+     * @param doHandleReplyTo
+     *            The doHandleReplyTo to set.
      */
-    protected void setDoHandleReplyTo(boolean doHandleReplyTo){
-        this.doHandleReplyTo=doHandleReplyTo;
+    protected void setDoHandleReplyTo(boolean doHandleReplyTo) {
+        this.doHandleReplyTo = doHandleReplyTo;
     }
 
     protected abstract MessageConsumer createConsumer() throws JMSException;
@@ -145,8 +170,17 @@
     protected abstract void sendMessage(Message message) throws JMSException;
 
     protected abstract Connection getConnnectionForConsumer();
-    
+
     protected abstract Connection getConnectionForProducer();
 
-    
+    protected void restartProducer() throws JMSException, NamingException {
+        try {
+            getConnectionForProducer().close();
+        }
+        catch (Exception e) {
+            log.debug("Ignoring failure to close producer connection: " + e, e);
+        }
+        jmsConnector.restartProducerConnection();
+        createProducer();
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java?rev=436752&r1=436751&r2=436752&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
Fri Aug 25 04:01:30 2006
@@ -23,6 +23,8 @@
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.naming.NamingException;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.Service;
@@ -36,14 +38,15 @@
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * This bridge joins the gap between foreign JMS providers and ActiveMQ As some JMS providers
are still only 1.0.1
- * compliant, this bridge itself aimed to be JMS 1.0.2 compliant.
+ * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
+ * JMS providers are still only 1.0.1 compliant, this bridge itself aimed to be
+ * JMS 1.0.2 compliant.
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public abstract class JmsConnector implements Service{
-    
-    private static final Log log=LogFactory.getLog(JmsConnector.class);
+public abstract class JmsConnector implements Service {
+
+    private static final Log log = LogFactory.getLog(JmsConnector.class);
     protected JndiTemplate jndiLocalTemplate;
     protected JndiTemplate jndiOutboundTemplate;
     protected JmsMesageConvertor inboundMessageConvertor;
@@ -52,101 +55,103 @@
     private List outboundBridges = new CopyOnWriteArrayList();
     protected AtomicBoolean initialized = new AtomicBoolean(false);
     protected AtomicBoolean started = new AtomicBoolean(false);
-    protected ActiveMQConnectionFactory  embeddedConnectionFactory;
-    protected int replyToDestinationCacheSize=10000;
+    protected ActiveMQConnectionFactory embeddedConnectionFactory;
+    protected int replyToDestinationCacheSize = 10000;
     protected String outboundUsername;
     protected String outboundPassword;
     protected String localUsername;
     protected String localPassword;
     private String name;
-    
-    protected LRUCache replyToBridges=new LRUCache(){
+
+    protected LRUCache replyToBridges = new LRUCache() {
         /**
          * 
          */
         private static final long serialVersionUID = -7446792754185879286L;
 
-        protected boolean removeEldestEntry(Map.Entry enty){
-            if(size()>maxCacheSize){
-                Iterator iter=entrySet().iterator();
-                Map.Entry lru=(Map.Entry) iter.next();
+        protected boolean removeEldestEntry(Map.Entry enty) {
+            if (size() > maxCacheSize) {
+                Iterator iter = entrySet().iterator();
+                Map.Entry lru = (Map.Entry) iter.next();
                 remove(lru.getKey());
-                DestinationBridge bridge=(DestinationBridge) lru.getValue();
-                try{
+                DestinationBridge bridge = (DestinationBridge) lru.getValue();
+                try {
                     bridge.stop();
-                    log.info("Expired bridge: "+bridge);
-                }catch(Exception e){
-                    log.warn("stopping expired bridge"+bridge+" caused an exception",e);
+                    log.info("Expired bridge: " + bridge);
+                }
+                catch (Exception e) {
+                    log.warn("stopping expired bridge" + bridge + " caused an exception",
e);
                 }
             }
             return false;
         }
     };
 
-    public boolean init(){
-        boolean result=initialized.compareAndSet(false,true);
-        if(result){
-            if(jndiLocalTemplate==null){
-                jndiLocalTemplate=new JndiTemplate();
+    public boolean init() {
+        boolean result = initialized.compareAndSet(false, true);
+        if (result) {
+            if (jndiLocalTemplate == null) {
+                jndiLocalTemplate = new JndiTemplate();
             }
-            if(jndiOutboundTemplate==null){
-                jndiOutboundTemplate=new JndiTemplate();
+            if (jndiOutboundTemplate == null) {
+                jndiOutboundTemplate = new JndiTemplate();
             }
-            if(inboundMessageConvertor==null){
-                inboundMessageConvertor=new SimpleJmsMessageConvertor();
+            if (inboundMessageConvertor == null) {
+                inboundMessageConvertor = new SimpleJmsMessageConvertor();
             }
-            if (outboundMessageConvertor==null){
-                outboundMessageConvertor=new SimpleJmsMessageConvertor();
+            if (outboundMessageConvertor == null) {
+                outboundMessageConvertor = new SimpleJmsMessageConvertor();
             }
             replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
         }
         return result;
     }
-    
-    public void start() throws Exception{
+
+    public void start() throws Exception {
         init();
-        if (started.compareAndSet(false, true)){
-            for(int i=0;i<inboundBridges.size();i++){
-                DestinationBridge bridge=(DestinationBridge) inboundBridges.get(i);
+        if (started.compareAndSet(false, true)) {
+            for (int i = 0; i < inboundBridges.size(); i++) {
+                DestinationBridge bridge = (DestinationBridge) inboundBridges.get(i);
                 bridge.start();
             }
-            for(int i=0;i<outboundBridges.size();i++){
-                DestinationBridge bridge=(DestinationBridge) outboundBridges.get(i);
+            for (int i = 0; i < outboundBridges.size(); i++) {
+                DestinationBridge bridge = (DestinationBridge) outboundBridges.get(i);
                 bridge.start();
             }
-            log.info("JMS Connector "+getName()+" Started");
+            log.info("JMS Connector " + getName() + " Started");
         }
     }
 
-    public void stop() throws Exception{
-        if(started.compareAndSet(true,false)){
-            for(int i=0;i<inboundBridges.size();i++){
-                DestinationBridge bridge=(DestinationBridge) inboundBridges.get(i);
+    public void stop() throws Exception {
+        if (started.compareAndSet(true, false)) {
+            for (int i = 0; i < inboundBridges.size(); i++) {
+                DestinationBridge bridge = (DestinationBridge) inboundBridges.get(i);
                 bridge.stop();
             }
-            for(int i=0;i<outboundBridges.size();i++){
-                DestinationBridge bridge=(DestinationBridge) outboundBridges.get(i);
+            for (int i = 0; i < outboundBridges.size(); i++) {
+                DestinationBridge bridge = (DestinationBridge) outboundBridges.get(i);
                 bridge.stop();
             }
-            log.info("JMS Connector "+getName()+" Stopped");
+            log.info("JMS Connector " + getName() + " Stopped");
         }
     }
-    
+
     protected abstract Destination createReplyToBridge(Destination destination, Connection
consumerConnection, Connection producerConnection);
-    
+
     /**
-     * One way to configure the local connection - this is called by
-     * The BrokerService when the Connector is embedded
+     * One way to configure the local connection - this is called by The
+     * BrokerService when the Connector is embedded
+     * 
      * @param service
      */
-    public void setBrokerService(BrokerService service){
+    public void setBrokerService(BrokerService service) {
         embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
     }
 
     /**
      * @return Returns the jndiTemplate.
      */
-    public JndiTemplate getJndiLocalTemplate(){
+    public JndiTemplate getJndiLocalTemplate() {
         return jndiLocalTemplate;
     }
 
@@ -154,28 +159,29 @@
      * @param jndiTemplate
      *            The jndiTemplate to set.
      */
-    public void setJndiLocalTemplate(JndiTemplate jndiTemplate){
-        this.jndiLocalTemplate=jndiTemplate;
+    public void setJndiLocalTemplate(JndiTemplate jndiTemplate) {
+        this.jndiLocalTemplate = jndiTemplate;
     }
 
     /**
      * @return Returns the jndiOutboundTemplate.
      */
-    public JndiTemplate getJndiOutboundTemplate(){
+    public JndiTemplate getJndiOutboundTemplate() {
         return jndiOutboundTemplate;
     }
 
     /**
-     * @param jndiOutboundTemplate The jndiOutboundTemplate to set.
+     * @param jndiOutboundTemplate
+     *            The jndiOutboundTemplate to set.
      */
-    public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate){
-        this.jndiOutboundTemplate=jndiOutboundTemplate;
+    public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) {
+        this.jndiOutboundTemplate = jndiOutboundTemplate;
     }
 
     /**
      * @return Returns the inboundMessageConvertor.
      */
-    public JmsMesageConvertor getInboundMessageConvertor(){
+    public JmsMesageConvertor getInboundMessageConvertor() {
         return inboundMessageConvertor;
     }
 
@@ -183,28 +189,29 @@
      * @param inboundMessageConvertor
      *            The inboundMessageConvertor to set.
      */
-    public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor){
-        this.inboundMessageConvertor=jmsMessageConvertor;
+    public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
+        this.inboundMessageConvertor = jmsMessageConvertor;
     }
 
     /**
      * @return Returns the outboundMessageConvertor.
      */
-    public JmsMesageConvertor getOutboundMessageConvertor(){
+    public JmsMesageConvertor getOutboundMessageConvertor() {
         return outboundMessageConvertor;
     }
 
     /**
-     * @param outboundMessageConvertor The outboundMessageConvertor to set.
+     * @param outboundMessageConvertor
+     *            The outboundMessageConvertor to set.
      */
-    public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor){
-        this.outboundMessageConvertor=outboundMessageConvertor;
+    public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor)
{
+        this.outboundMessageConvertor = outboundMessageConvertor;
     }
 
     /**
      * @return Returns the replyToDestinationCacheSize.
      */
-    public int getReplyToDestinationCacheSize(){
+    public int getReplyToDestinationCacheSize() {
         return replyToDestinationCacheSize;
     }
 
@@ -212,90 +219,95 @@
      * @param replyToDestinationCacheSize
      *            The replyToDestinationCacheSize to set.
      */
-    public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize){
-        this.replyToDestinationCacheSize=replyToDestinationCacheSize;
+    public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
+        this.replyToDestinationCacheSize = replyToDestinationCacheSize;
     }
-    
-    
+
     /**
      * @return Returns the localPassword.
      */
-    public String getLocalPassword(){
+    public String getLocalPassword() {
         return localPassword;
     }
 
     /**
-     * @param localPassword The localPassword to set.
+     * @param localPassword
+     *            The localPassword to set.
      */
-    public void setLocalPassword(String localPassword){
-        this.localPassword=localPassword;
+    public void setLocalPassword(String localPassword) {
+        this.localPassword = localPassword;
     }
 
     /**
      * @return Returns the localUsername.
      */
-    public String getLocalUsername(){
+    public String getLocalUsername() {
         return localUsername;
     }
 
     /**
-     * @param localUsername The localUsername to set.
+     * @param localUsername
+     *            The localUsername to set.
      */
-    public void setLocalUsername(String localUsername){
-        this.localUsername=localUsername;
+    public void setLocalUsername(String localUsername) {
+        this.localUsername = localUsername;
     }
 
     /**
      * @return Returns the outboundPassword.
      */
-    public String getOutboundPassword(){
+    public String getOutboundPassword() {
         return outboundPassword;
     }
 
     /**
-     * @param outboundPassword The outboundPassword to set.
+     * @param outboundPassword
+     *            The outboundPassword to set.
      */
-    public void setOutboundPassword(String outboundPassword){
-        this.outboundPassword=outboundPassword;
+    public void setOutboundPassword(String outboundPassword) {
+        this.outboundPassword = outboundPassword;
     }
 
     /**
      * @return Returns the outboundUsername.
      */
-    public String getOutboundUsername(){
+    public String getOutboundUsername() {
         return outboundUsername;
     }
 
     /**
-     * @param outboundUsername The outboundUsername to set.
+     * @param outboundUsername
+     *            The outboundUsername to set.
      */
-    public void setOutboundUsername(String outboundUsername){
-        this.outboundUsername=outboundUsername;
+    public void setOutboundUsername(String outboundUsername) {
+        this.outboundUsername = outboundUsername;
     }
 
-    protected void addInboundBridge(DestinationBridge bridge){
+    protected void addInboundBridge(DestinationBridge bridge) {
         inboundBridges.add(bridge);
     }
-    
-    protected void addOutboundBridge(DestinationBridge bridge){
+
+    protected void addOutboundBridge(DestinationBridge bridge) {
         outboundBridges.add(bridge);
     }
-    protected void removeInboundBridge(DestinationBridge bridge){
+
+    protected void removeInboundBridge(DestinationBridge bridge) {
         inboundBridges.add(bridge);
     }
-    
-    protected void removeOutboundBridge(DestinationBridge bridge){
+
+    protected void removeOutboundBridge(DestinationBridge bridge) {
         outboundBridges.add(bridge);
     }
 
     public String getName() {
-        if( name == null ) {
-            name = "Connector:"+getNextId();
+        if (name == null) {
+            name = "Connector:" + getNextId();
         }
         return name;
     }
-    
+
     static int nextId;
+
     static private synchronized int getNextId() {
         return nextId++;
     }
@@ -303,4 +315,6 @@
     public void setName(String name) {
         this.name = name;
     }
+
+    public abstract void restartProducerConnection() throws NamingException, JMSException;
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java?rev=436752&r1=436751&r2=436752&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
Fri Aug 25 04:01:30 2006
@@ -17,6 +17,9 @@
  */
 package org.apache.activemq.network.jms;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -26,9 +29,6 @@
 import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.naming.NamingException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 /**
  * A Bridge to other JMS Queue providers
  * 
@@ -186,7 +186,11 @@
         this.outboundQueueConnectionFactory=foreignQueueConnectionFactory;
     }
 
-    
+    public void restartProducerConnection() throws NamingException, JMSException {
+        outboundQueueConnection = null;
+        initializeForeignQueueConnection();
+    }
+
     protected void initializeForeignQueueConnection() throws NamingException,JMSException{
         if(outboundQueueConnection==null){
             // get the connection factories

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java?rev=436752&r1=436751&r2=436752&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
Fri Aug 25 04:01:30 2006
@@ -187,7 +187,11 @@
         this.outboundTopicConnectionFactory=foreignTopicConnectionFactory;
     }
 
-    
+
+    public void restartProducerConnection() throws NamingException, JMSException {
+        outboundTopicConnection = null;
+        initializeForeignTopicConnection();
+    }
 
     protected void initializeForeignTopicConnection() throws NamingException,JMSException{
         if(outboundTopicConnection==null){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java?rev=436752&r1=436751&r2=436752&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java
Fri Aug 25 04:01:30 2006
@@ -56,12 +56,10 @@
         }
     }
     
-    
 
     protected MessageConsumer createConsumer() throws JMSException{
         // set up the consumer
         consumerSession=consumerConnection.createQueueSession(false,Session.CLIENT_ACKNOWLEDGE);
-        producerSession=producerConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer=null;
         
             if(selector!=null&&selector.length()>0){
@@ -74,6 +72,7 @@
     }
     
     protected MessageProducer createProducer() throws JMSException{
+        producerSession=producerConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
         producer = producerSession.createSender(null);
         return producer;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java?rev=436752&r1=436751&r2=436752&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
Fri Aug 25 04:01:30 2006
@@ -60,7 +60,6 @@
     protected MessageConsumer createConsumer() throws JMSException{
         // set up the consumer
         consumerSession=consumerConnection.createTopicSession(false,Session.CLIENT_ACKNOWLEDGE);
-        producerSession=producerConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer=null;
         if(consumerName!=null&&consumerName.length()>0){
             if(selector!=null&&selector.length()>0){
@@ -81,6 +80,7 @@
     
     
     protected MessageProducer createProducer() throws JMSException{
+        producerSession=producerConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
         producer = producerSession.createPublisher(null);
         return producer;
     }



Mime
View raw message