Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 34702 invoked from network); 8 Feb 2006 18:27:46 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 8 Feb 2006 18:27:45 -0000 Received: (qmail 1579 invoked by uid 500); 8 Feb 2006 18:27:36 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 1566 invoked by uid 500); 8 Feb 2006 18:27:36 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 1555 invoked by uid 99); 8 Feb 2006 18:27:36 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2006 10:27:36 -0800 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 08 Feb 2006 10:27:35 -0800 Received: (qmail 34403 invoked by uid 65534); 8 Feb 2006 18:26:59 -0000 Message-ID: <20060208182659.34351.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r376019 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms: DestinationBridge.java JmsConnector.java JmsQueueConnector.java JmsTopicConnector.java QueueBridge.java TopicBridge.java Date: Wed, 08 Feb 2006 18:26:38 -0000 To: activemq-commits@geronimo.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.6 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: rajdavies Date: Wed Feb 8 10:26:35 2006 New Revision: 376019 URL: http://svn.apache.org/viewcvs?rev=376019&view=rev Log: rationalize the QueueBridge/TopicBridge Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java?rev=376019&r1=376018&r2=376019&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java Wed Feb 8 10:26:35 2006 @@ -16,6 +16,7 @@ */ package org.apache.activemq.network.jms; +import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -39,6 +40,7 @@ protected AtomicBoolean started=new AtomicBoolean(false); protected JmsMesageConvertor jmsMessageConvertor; protected boolean doHandleReplyTo = true; + protected JmsConnector jmsConnector; /** * @return Returns the consumer. @@ -56,6 +58,12 @@ } /** + * @param connector + */ + public void setJmsConnector(JmsConnector connector){ + this.jmsConnector = connector; + } + /** * @return Returns the inboundMessageConvertor. */ public JmsMesageConvertor getJmsMessageConvertor(){ @@ -63,13 +71,17 @@ } /** - * @param inboundMessageConvertor - * The inboundMessageConvertor to set. + * @param jmsMessageConvertor */ public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor){ this.jmsMessageConvertor=jmsMessageConvertor; } + + protected Destination processReplyToDestination (Destination destination){ + return jmsConnector.createReplyToBridge(destination, getConsumerConnection(), getProducerConnection()); + } + public void start() throws Exception{ if(started.compareAndSet(false,true)){ MessageConsumer consumer=createConsumer(); @@ -128,7 +140,9 @@ protected abstract void sendMessage(Message message) throws JMSException; - protected abstract Destination processReplyToDestination(Destination destination); + protected abstract Connection getConsumerConnection(); + + protected abstract Connection getProducerConnection(); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java?rev=376019&r1=376018&r2=376019&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java Wed Feb 8 10:26:35 2006 @@ -19,6 +19,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.QueueConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.Service; @@ -44,10 +48,14 @@ protected JmsMesageConvertor outboundMessageConvertor; private List inboundBridges = new CopyOnWriteArrayList(); private List outboundBridges = new CopyOnWriteArrayList(); - protected int replyToDestinationCacheSize=10000; protected AtomicBoolean initialized = new AtomicBoolean(false); protected AtomicBoolean started = new AtomicBoolean(false); protected ActiveMQConnectionFactory embeddedConnectionFactory; + protected int replyToDestinationCacheSize=10000; + protected String outboundUsername; + protected String outboundPassword; + protected String localUsername; + protected String localPassword; protected LRUCache replyToBridges=new LRUCache(){ protected boolean removeEldestEntry(Map.Entry enty){ if(size()>maxCacheSize){ @@ -113,6 +121,8 @@ } } + protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection); + /** * One way to configure the local connection - this is called by * The BrokerService when the Connector is embedded @@ -196,6 +206,62 @@ } + /** + * @return Returns the localPassword. + */ + public String getLocalPassword(){ + return localPassword; + } + + /** + * @param localPassword The localPassword to set. + */ + public void setLocalPassword(String localPassword){ + this.localPassword=localPassword; + } + + /** + * @return Returns the localUsername. + */ + public String getLocalUsername(){ + return localUsername; + } + + /** + * @param localUsername The localUsername to set. + */ + public void setLocalUsername(String localUsername){ + this.localUsername=localUsername; + } + + /** + * @return Returns the outboundPassword. + */ + public String getOutboundPassword(){ + return outboundPassword; + } + + /** + * @param outboundPassword The outboundPassword to set. + */ + public void setOutboundPassword(String outboundPassword){ + this.outboundPassword=outboundPassword; + } + + /** + * @return Returns the outboundUsername. + */ + public String getOutboundUsername(){ + return outboundUsername; + } + + /** + * @param outboundUsername The outboundUsername to set. + */ + public void setOutboundUsername(String outboundUsername){ + this.outboundUsername=outboundUsername; + } + protected void addInboundBridge(DestinationBridge bridge){ inboundBridges.add(bridge); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java?rev=376019&r1=376018&r2=376019&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java Wed Feb 8 10:26:35 2006 @@ -19,6 +19,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Queue; @@ -44,10 +45,7 @@ private QueueConnection localQueueConnection; private InboundQueueBridge[] inboundQueueBridges; private OutboundQueueBridge[] outboundQueueBridges; - private String outboundUsername; - private String outboundPassword; - private String localUsername; - private String localPassword; + @@ -189,80 +187,7 @@ this.outboundQueueConnectionFactory=foreignQueueConnectionFactory; } - /** - * @return Returns the outboundPassword. - */ - public String getOutboundPassword(){ - return outboundPassword; - } - - /** - * @param outboundPassword - * The outboundPassword to set. - */ - public void setOutboundPassword(String foreignPassword){ - this.outboundPassword=foreignPassword; - } - - /** - * @return Returns the outboundUsername. - */ - public String getOutboundUsername(){ - return outboundUsername; - } - - /** - * @param outboundUsername - * The outboundUsername to set. - */ - public void setOutboundUsername(String foreignUsername){ - this.outboundUsername=foreignUsername; - } - - /** - * @return Returns the localPassword. - */ - public String getLocalPassword(){ - return localPassword; - } - - /** - * @param localPassword - * The localPassword to set. - */ - public void setLocalPassword(String localPassword){ - this.localPassword=localPassword; - } - - /** - * @return Returns the localUsername. - */ - public String getLocalUsername(){ - return localUsername; - } - - /** - * @param localUsername - * The localUsername to set. - */ - public void setLocalUsername(String localUsername){ - this.localUsername=localUsername; - } - /** - * @return Returns the replyToDestinationCacheSize. - */ - public int getReplyToDestinationCacheSize(){ - return replyToDestinationCacheSize; - } - - /** - * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set. - */ - public void setReplyToDestinationCacheSize(int temporaryQueueCacheSize){ - this.replyToDestinationCacheSize=temporaryQueueCacheSize; - } - protected void initializeForeignQueueConnection() throws NamingException,JMSException{ if(outboundQueueConnection==null){ // get the connection factories @@ -341,7 +266,7 @@ if(bridge.getJmsMessageConvertor()==null){ bridge.setJmsMessageConvertor(getInboundMessageConvertor()); } - bridge.setJmsQueueConnector(this); + bridge.setJmsConnector(this); addInboundBridge(bridge); } outboundSession.close(); @@ -366,7 +291,7 @@ if(bridge.getJmsMessageConvertor()==null){ bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } - bridge.setJmsQueueConnector(this); + bridge.setJmsConnector(this); addOutboundBridge(bridge); } outboundSession.close(); @@ -374,7 +299,8 @@ } } - protected Destination createReplyToQueueBridge(Queue queue, QueueConnection consumerConnection, QueueConnection producerConnection){ + protected Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection){ + Queue queue = (Queue)destination; OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(queue); if (bridge == null){ bridge = new OutboundQueueBridge(){ @@ -395,7 +321,7 @@ if(bridge.getJmsMessageConvertor()==null){ bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } - bridge.setJmsQueueConnector(this); + bridge.setJmsConnector(this); bridge.start(); log.info("Created replyTo bridge for " + queue); }catch(Exception e){ Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java?rev=376019&r1=376018&r2=376019&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java Wed Feb 8 10:26:35 2006 @@ -16,6 +16,7 @@ */ package org.apache.activemq.network.jms; +import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Session; @@ -44,10 +45,7 @@ private TopicConnection localTopicConnection; private InboundTopicBridge[] inboundTopicBridges; private OutboundTopicBridge[] outboundTopicBridges; - private String outboundUsername; - private String outboundPassword; - private String localUsername; - private String localPassword; + @@ -189,79 +187,7 @@ this.outboundTopicConnectionFactory=foreignTopicConnectionFactory; } - /** - * @return Returns the outboundPassword. - */ - public String getOutboundPassword(){ - return outboundPassword; - } - - /** - * @param outboundPassword - * The outboundPassword to set. - */ - public void setOutboundPassword(String foreignPassword){ - this.outboundPassword=foreignPassword; - } - - /** - * @return Returns the outboundUsername. - */ - public String getOutboundUsername(){ - return outboundUsername; - } - - /** - * @param outboundUsername - * The outboundUsername to set. - */ - public void setOutboundUsername(String foreignUsername){ - this.outboundUsername=foreignUsername; - } - - /** - * @return Returns the localPassword. - */ - public String getLocalPassword(){ - return localPassword; - } - - /** - * @param localPassword - * The localPassword to set. - */ - public void setLocalPassword(String localPassword){ - this.localPassword=localPassword; - } - - /** - * @return Returns the localUsername. - */ - public String getLocalUsername(){ - return localUsername; - } - - /** - * @param localUsername - * The localUsername to set. - */ - public void setLocalUsername(String localUsername){ - this.localUsername=localUsername; - } - /** - * @return Returns the replyToDestinationCacheSize. - */ - public int getReplyToDestinationCacheSize(){ - return replyToDestinationCacheSize; - } - - /** - * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set. - */ - public void setReplyToDestinationCacheSize(int temporaryTopicCacheSize){ - this.replyToDestinationCacheSize=temporaryTopicCacheSize; - } protected void initializeForeignTopicConnection() throws NamingException,JMSException{ if(outboundTopicConnection==null){ @@ -341,7 +267,7 @@ if(bridge.getJmsMessageConvertor()==null){ bridge.setJmsMessageConvertor(getInboundMessageConvertor()); } - bridge.setJmsTopicConnector(this); + bridge.setJmsConnector(this); addInboundBridge(bridge); } outboundSession.close(); @@ -366,7 +292,7 @@ if(bridge.getJmsMessageConvertor()==null){ bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } - bridge.setJmsTopicConnector(this); + bridge.setJmsConnector(this); addOutboundBridge(bridge); } outboundSession.close(); @@ -374,7 +300,9 @@ } } - protected Destination createReplyToTopicBridge(Topic topic, TopicConnection consumerConnection, TopicConnection producerConnection){ + protected Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection){ + Topic topic =(Topic)destination; + OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(topic); if (bridge == null){ bridge = new OutboundTopicBridge(){ @@ -395,7 +323,7 @@ if(bridge.getJmsMessageConvertor()==null){ bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } - bridge.setJmsTopicConnector(this); + bridge.setJmsConnector(this); bridge.start(); log.info("Created replyTo bridge for " + topic); }catch(Exception e){ Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java?rev=376019&r1=376018&r2=376019&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java Wed Feb 8 10:26:35 2006 @@ -42,7 +42,7 @@ protected QueueSender producer; protected QueueConnection consumerConnection; protected QueueConnection producerConnection; - protected JmsQueueConnector jmsQueueConnector; + public void stop() throws Exception{ super.stop(); @@ -54,9 +54,7 @@ } } - protected void setJmsQueueConnector(JmsQueueConnector connector){ - this.jmsQueueConnector = connector; - } + protected MessageConsumer createConsumer() throws JMSException{ // set up the consumer @@ -79,12 +77,7 @@ } - protected Destination processReplyToDestination (Destination destination){ - Queue queue = (Queue)destination; - return jmsQueueConnector.createReplyToQueueBridge(queue, getConsumerConnection(), getProducerConnection()); - } - - + protected void sendMessage(Message message) throws JMSException{ producer.send(producerQueue,message); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java?rev=376019&r1=376018&r2=376019&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java Wed Feb 8 10:26:35 2006 @@ -41,7 +41,7 @@ protected TopicPublisher producer; protected TopicConnection consumerConnection; protected TopicConnection producerConnection; - protected JmsTopicConnector jmsTopicConnector; + public void stop() throws Exception{ super.stop(); @@ -53,9 +53,7 @@ } } - protected void setJmsTopicConnector(JmsTopicConnector connector){ - this.jmsTopicConnector = connector; - } + protected MessageConsumer createConsumer() throws JMSException{ // set up the consumer @@ -78,10 +76,7 @@ return consumer; } - protected Destination processReplyToDestination (Destination destination){ - Topic topic = (Topic)destination; - return jmsTopicConnector.createReplyToTopicBridge(topic, getConsumerConnection(), getProducerConnection()); - } + protected MessageProducer createProducer() throws JMSException{ producer = producerSession.createPublisher(null);