activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r393383 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms: DestinationBridge.java JmsMesageConvertor.java JmsQueueConnector.java JmsTopicConnector.java SimpleJmsMessageConvertor.java
Date Wed, 12 Apr 2006 04:46:22 GMT
Author: rajdavies
Date: Tue Apr 11 21:46:22 2006
New Revision: 393383

URL: http://svn.apache.org/viewcvs?rev=393383&view=rev
Log:
added changes from http://jira.activemq.org/jira//browse/AMQ-660
to allow destination conversation on outbound messages with replyTo destinations


Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java?rev=393383&r1=393382&r2=393383&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
Tue Apr 11 21:46:22 2006
@@ -34,7 +34,7 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-abstract class DestinationBridge implements Service,MessageListener{
+public abstract class DestinationBridge implements Service,MessageListener{
     private static final Log log=LogFactory.getLog(DestinationBridge.class);
     protected MessageConsumer consumer;
     protected AtomicBoolean started=new AtomicBoolean(false);
@@ -93,32 +93,35 @@
     public void stop() throws Exception{
         started.set(false);
     }
-
+    
     public void onMessage(Message message){
-        if(started.get()&&message!=null){
-            try{
-                if(doHandleReplyTo){
-                    Destination replyTo=message.getJMSReplyTo();
-                    if(replyTo!=null){
-                        replyTo=processReplyToDestination(replyTo);
-                        message.setJMSReplyTo(replyTo);
-                    }
-                }else {
-                    message.setJMSReplyTo(null);
-                }
-                Message converted=jmsMessageConvertor.convert(message);
-                sendMessage(converted);
-                message.acknowledge();
-            }catch(JMSException e){
-                log.error("failed to forward message: "+message,e);
-                try{
-                    stop();
-                }catch(Exception e1){
-                    log.warn("Failed to stop cleanly",e1);
-                }
-            }
-        }
+    	if(started.get()&&message!=null){
+    		try{
+    			Message converted;
+    			if(doHandleReplyTo){
+    				Destination replyTo = message.getJMSReplyTo();
+    				if(replyTo != null){
+    					converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
+    				} else {
+    					converted = jmsMessageConvertor.convert(message);
+    				}
+    			} else {
+    				message.setJMSReplyTo(null);
+    				converted = jmsMessageConvertor.convert(message);
+    			}				
+    			sendMessage(converted);
+    			message.acknowledge();
+    		}catch(JMSException e){
+    			log.error("failed to forward message: "+message,e);
+    			try{
+    				stop();
+    			}catch(Exception e1){
+    				log.warn("Failed to stop cleanly",e1);
+    			}
+    		}
+    	}
     }
+
     
     /**
      * @return Returns the doHandleReplyTo.

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java?rev=393383&r1=393382&r2=393383&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java
Tue Apr 11 21:46:22 2006
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.network.jms;
 
+import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
@@ -34,5 +36,8 @@
      */
     public Message convert(Message message) throws JMSException;
     
+    public Message convert(Message message, Destination replyTo) throws JMSException;
+    
+    public void setConnection(Connection connection);
    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java?rev=393383&r1=393382&r2=393383&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
Tue Apr 11 21:46:22 2006
@@ -16,9 +16,6 @@
  */
 package org.apache.activemq.network.jms;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -28,6 +25,9 @@
 import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.naming.NamingException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 /**
  * A Bridge to other JMS Queue providers
  * 
@@ -44,11 +44,7 @@
     private QueueConnection outboundQueueConnection;
     private QueueConnection localQueueConnection;
     private InboundQueueBridge[] inboundQueueBridges;
-    private OutboundQueueBridge[] outboundQueueBridges;
-    
-   
-   
-   
+    private OutboundQueueBridge[] outboundQueueBridges;   
 
     public boolean init(){
         boolean result=super.init();
@@ -56,6 +52,8 @@
             try{
                 initializeForeignQueueConnection();
                 initializeLocalQueueConnection();
+                initializeInboundJmsMessageConvertor();
+                initializeOutboundJmsMessageConvertor();
                 initializeInboundQueueBridges();
                 initializeOutboundQueueBridges();
             }catch(Exception e){
@@ -249,6 +247,14 @@
         }
         localQueueConnection.start();
     }
+    
+    protected void initializeInboundJmsMessageConvertor(){
+    	inboundMessageConvertor.setConnection(localQueueConnection);
+    }
+    
+    protected void initializeOutboundJmsMessageConvertor(){
+    	outboundMessageConvertor.setConnection(outboundQueueConnection);
+    }
 
     protected void initializeInboundQueueBridges() throws JMSException{
         if(inboundQueueBridges!=null){
@@ -287,7 +293,6 @@
                 bridge.setProducerQueue(foreignQueue);
                 bridge.setProducerConnection(outboundQueueConnection);
                 bridge.setConsumerConnection(localQueueConnection);
-                bridge.setDoHandleReplyTo(false);
                 if(bridge.getJmsMessageConvertor()==null){
                     bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
                 }
@@ -299,38 +304,71 @@
         }
     }
     
-    protected Destination createReplyToBridge(Destination destination, Connection consumerConnection,
Connection producerConnection){
-        Queue queue = (Queue)destination;
-        OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(queue);
-        if (bridge == null){
-            bridge = new OutboundQueueBridge(){
-                //we only handle replyTo destinations - inbound
-                protected Destination processReplyToDestination (Destination destination){
-                    return null;
-                }
-            };
-            try{
-                QueueSession localSession = localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
-                Queue localQueue = localSession.createTemporaryQueue();
-                localSession.close();
-                bridge.setConsumerQueue(localQueue);
-                bridge.setProducerQueue(queue);
-                bridge.setProducerConnection(outboundQueueConnection);
-                bridge.setConsumerConnection(localQueueConnection);
-                bridge.setDoHandleReplyTo(false);
-                if(bridge.getJmsMessageConvertor()==null){
-                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
-                }
-                bridge.setJmsConnector(this);
-                bridge.start();
-                log.info("Created replyTo bridge for " + queue);
-            }catch(Exception e){
-               log.error("Failed to create replyTo bridge for queue: " + queue,e);
-               return null;
-            }
-            replyToBridges.put(queue, bridge);
-        }
-        return bridge.getConsumerQueue();
+    protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
Connection replyToConsumerConnection){        
+    	Queue replyToProducerQueue =(Queue)destination;
+    	boolean isInbound = replyToProducerConnection.equals(localQueueConnection);
+    	
+    	if(isInbound){
+    		InboundQueueBridge bridge = (InboundQueueBridge) replyToBridges.get(replyToProducerQueue);
+    		if (bridge == null){
+    			bridge = new InboundQueueBridge(){
+    				protected Destination processReplyToDestination (Destination destination){
+    					return null;
+    				}
+    			};
+    			try{
+    				QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
+    				Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
+    				replyToConsumerSession.close();
+    				bridge.setConsumerQueue(replyToConsumerQueue);
+    				bridge.setProducerQueue(replyToProducerQueue);
+    				bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
+    				bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
+    				bridge.setDoHandleReplyTo(false);
+    				if(bridge.getJmsMessageConvertor()==null){
+    					bridge.setJmsMessageConvertor(getInboundMessageConvertor());
+    				}
+    				bridge.setJmsConnector(this);
+    				bridge.start();
+    				log.info("Created replyTo bridge for " + replyToProducerQueue);
+    			}catch(Exception e){
+    				log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
+    				return null;
+    			}
+    			replyToBridges.put(replyToProducerQueue, bridge);
+    		}
+    		return bridge.getConsumerQueue();
+    	}else{
+    		OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(replyToProducerQueue);
+    		if (bridge == null){
+    			bridge = new OutboundQueueBridge(){
+    				protected Destination processReplyToDestination (Destination destination){
+    					return null;
+    				}
+    			};
+    			try{
+    				QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
+    				Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
+    				replyToConsumerSession.close();
+    				bridge.setConsumerQueue(replyToConsumerQueue);
+    				bridge.setProducerQueue(replyToProducerQueue);
+    				bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
+    				bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
+    				bridge.setDoHandleReplyTo(false);
+    				if(bridge.getJmsMessageConvertor()==null){
+    					bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
+    				}
+    				bridge.setJmsConnector(this);
+    				bridge.start();
+    				log.info("Created replyTo bridge for " + replyToProducerQueue);
+    			}catch(Exception e){
+    				log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
+    				return null;
+    			}
+    			replyToBridges.put(replyToProducerQueue, bridge);
+    		}
+    		return bridge.getConsumerQueue();
+    	}		
     }
     
     protected Queue createActiveMQQueue(QueueSession session,String queueName) throws JMSException{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java?rev=393383&r1=393382&r2=393383&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
Tue Apr 11 21:46:22 2006
@@ -25,6 +25,7 @@
 import javax.jms.TopicConnectionFactory;
 import javax.jms.TopicSession;
 import javax.naming.NamingException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -46,16 +47,14 @@
     private InboundTopicBridge[] inboundTopicBridges;
     private OutboundTopicBridge[] outboundTopicBridges;
     
-   
-   
-   
-
     public boolean init(){
         boolean result=super.init();
         if(result){
             try{
                 initializeForeignTopicConnection();
                 initializeLocalTopicConnection();
+                initializeInboundJmsMessageConvertor();
+                initializeOutboundJmsMessageConvertor();
                 initializeInboundTopicBridges();
                 initializeOutboundTopicBridges();
             }catch(Exception e){
@@ -250,6 +249,14 @@
         }
         localTopicConnection.start();
     }
+    
+    protected void initializeInboundJmsMessageConvertor(){
+    	inboundMessageConvertor.setConnection(localTopicConnection);
+    }
+    
+    protected void initializeOutboundJmsMessageConvertor(){
+    	outboundMessageConvertor.setConnection(outboundTopicConnection);
+    }
 
     protected void initializeInboundTopicBridges() throws JMSException{
         if(inboundTopicBridges!=null){
@@ -288,7 +295,6 @@
                 bridge.setProducerTopic(foreignTopic);
                 bridge.setProducerConnection(outboundTopicConnection);
                 bridge.setConsumerConnection(localTopicConnection);
-                bridge.setDoHandleReplyTo(false);
                 if(bridge.getJmsMessageConvertor()==null){
                     bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
                 }
@@ -300,39 +306,71 @@
         }
     }
     
-    protected  Destination createReplyToBridge(Destination destination, Connection consumerConnection,
Connection producerConnection){
-        Topic topic =(Topic)destination;
-        
-        OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(topic);
-        if (bridge == null){
-            bridge = new OutboundTopicBridge(){
-                //we only handle replyTo destinations - inbound
-                protected Destination processReplyToDestination (Destination destination){
-                    return null;
-                }
-            };
-            try{
-                TopicSession localSession = localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
-                Topic localTopic = localSession.createTemporaryTopic();
-                localSession.close();
-                bridge.setConsumerTopic(localTopic);
-                bridge.setProducerTopic(topic);
-                bridge.setProducerConnection(outboundTopicConnection);
-                bridge.setConsumerConnection(localTopicConnection);
-                bridge.setDoHandleReplyTo(false);
-                if(bridge.getJmsMessageConvertor()==null){
-                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
-                }
-                bridge.setJmsConnector(this);
-                bridge.start();
-                log.info("Created replyTo bridge for " + topic);
-            }catch(Exception e){
-               log.error("Failed to create replyTo bridge for topic: " + topic,e);
-               return null;
-            }
-            replyToBridges.put(topic, bridge);
-        }
-        return bridge.getConsumerTopic();
+    protected  Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
Connection replyToConsumerConnection){
+    	Topic replyToProducerTopic =(Topic)destination;
+    	boolean isInbound = replyToProducerConnection.equals(localTopicConnection);
+    	
+    	if(isInbound){
+    		InboundTopicBridge bridge = (InboundTopicBridge) replyToBridges.get(replyToProducerTopic);
+    		if (bridge == null){
+    			bridge = new InboundTopicBridge(){
+    				protected Destination processReplyToDestination (Destination destination){
+    					return null;
+    				}
+    			};
+    			try{
+    				TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
+    				Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
+    				replyToConsumerSession.close();
+    				bridge.setConsumerTopic(replyToConsumerTopic);
+    				bridge.setProducerTopic(replyToProducerTopic);
+    				bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
+    				bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
+    				bridge.setDoHandleReplyTo(false);
+    				if(bridge.getJmsMessageConvertor()==null){
+    					bridge.setJmsMessageConvertor(getInboundMessageConvertor());
+    				}
+    				bridge.setJmsConnector(this);
+    				bridge.start();
+    				log.info("Created replyTo bridge for " + replyToProducerTopic);
+    			}catch(Exception e){
+    				log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
+    				return null;
+    			}
+    			replyToBridges.put(replyToProducerTopic, bridge);
+    		}
+    		return bridge.getConsumerTopic();
+    	}else{
+    		OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(replyToProducerTopic);
+    		if (bridge == null){
+    			bridge = new OutboundTopicBridge(){
+    				protected Destination processReplyToDestination (Destination destination){
+    					return null;
+    				}
+    			};
+    			try{
+    				TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
+    				Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
+    				replyToConsumerSession.close();
+    				bridge.setConsumerTopic(replyToConsumerTopic);
+    				bridge.setProducerTopic(replyToProducerTopic);
+    				bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
+    				bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
+    				bridge.setDoHandleReplyTo(false);
+    				if(bridge.getJmsMessageConvertor()==null){
+    					bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
+    				}
+    				bridge.setJmsConnector(this);
+    				bridge.start();
+    				log.info("Created replyTo bridge for " + replyToProducerTopic);
+    			}catch(Exception e){
+    				log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
+    				return null;
+    			}
+    			replyToBridges.put(replyToProducerTopic, bridge);
+    		}
+    		return bridge.getConsumerTopic();
+    	}		
     }
     
     protected Topic createActiveMQTopic(TopicSession session,String topicName) throws JMSException{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java?rev=393383&r1=393382&r2=393383&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java
Tue Apr 11 21:46:22 2006
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.network.jms;
 
+import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
@@ -37,6 +39,20 @@
      */
     public Message convert(Message message) throws JMSException{
         return message;
+    }
+    
+    public Message convert(Message message, Destination replyTo) throws JMSException{
+    	Message msg = convert(message);
+    	if(replyTo != null) {
+    		msg.setJMSReplyTo(replyTo);
+		}else{
+			msg.setJMSReplyTo(null);
+		}
+		return msg;
+    }
+    
+    public void setConnection(Connection connection){
+    	//do nothing
     }
     
    



Mime
View raw message