Return-Path: Delivered-To: apmail-geronimo-activemq-users-archive@www.apache.org Received: (qmail 95786 invoked from network); 27 Sep 2006 15:13:09 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 27 Sep 2006 15:13:09 -0000 Received: (qmail 16652 invoked by uid 500); 27 Sep 2006 15:13:08 -0000 Delivered-To: apmail-geronimo-activemq-users-archive@geronimo.apache.org Received: (qmail 16630 invoked by uid 500); 27 Sep 2006 15:13:08 -0000 Mailing-List: contact activemq-users-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-users@geronimo.apache.org Delivered-To: mailing list activemq-users@geronimo.apache.org Received: (qmail 16619 invoked by uid 99); 27 Sep 2006 15:13:08 -0000 Received: from idunn.apache.osuosl.org (HELO idunn.apache.osuosl.org) (140.211.166.84) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Sep 2006 08:13:08 -0700 Authentication-Results: idunn.apache.osuosl.org smtp.mail=mteira@tid.es; spf=pass X-ASF-Spam-Status: No, hits=0.0 required=5.0 tests= Received-SPF: pass (idunn.apache.osuosl.org: domain tid.es designates 193.145.240.2 as permitted sender) Received: from [193.145.240.2] ([193.145.240.2:58186] helo=correo.tid.es) by idunn.apache.osuosl.org (ecelerity 2.1.1.8 r(12930)) with ESMTP id D8/60-28186-CF49A154 for ; Wed, 27 Sep 2006 08:13:06 -0700 Received: from tid (filvit [192.168.48.202]) by tid.hi.inet (iPlanet Messaging Server 5.2 HotFix 2.14 (built Aug 8 2006)) with ESMTP id <0J6900H53BEK6Q@tid.hi.inet> for activemq-users@geronimo.apache.org; Wed, 27 Sep 2006 17:08:44 +0200 (MEST) Received: from [10.95.89.19] by tid.hi.inet (iPlanet Messaging Server 5.2 HotFix 2.14 (built Aug 8 2006)) with ESMTPA id <0J6900M3NBEJ7Y@tid.hi.inet> for activemq-users@geronimo.apache.org; Wed, 27 Sep 2006 17:08:44 +0200 (MEST) Date: Wed, 27 Sep 2006 17:08:42 +0200 From: Manuel Teira Subject: Re: QueueBridge and remote broker reconnections In-reply-to: Sender: mteira@tid.es To: activemq-users@geronimo.apache.org Message-id: <451A93FA.6080807@tid.es> MIME-version: 1.0 Content-type: multipart/mixed; boundary="Boundary_(ID_AbN9XBPPG5iGO9jSzL94MQ)" User-Agent: Thunderbird 1.5.0.7 (Windows/20060909) References: <451A8120.8040001@tid.es> X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N --Boundary_(ID_AbN9XBPPG5iGO9jSzL94MQ) Content-type: text/plain; charset=ISO-8859-1; format=flowed Content-transfer-encoding: 8BIT Rob Davies escribi�: > Hi Manuel, > > this looks like a good catch! Would mind opening a jira on this - just > so it's easier to track - I'll look at this as soon as I can > > cheers, Thanks. Rob. I've been experimenting further, and have made some changes to allow a DestinationBridge to get its Connections changed. I've created the new abstract methods: protected abstract void setConnectionForConsumer(Connection consumerConnection); protected abstract void setConnectionForProducer(Connection producerConnection); And implemented them into the subclasses QueueBridge and TopicBridge, just checking the casting to the right java.jms.Connection subclasses. Also, I've changed the abstract method restartProducerConnection of JmsConnector to make it return the new connection and so, be able to inject it back into the DestinationBridge making use of the new setter methods. Also a pair of changes in DestinationBridge onMessage method to manage correctly (I expect) the (now attribute) variable 'attempt'. I don't know if this is the path to follow, but it seems to work fine. Anyway, I think that only *new messages* are being sent to the remote bridged broker, and no the ones that were first tried during the remote broker failure. Regards. > > Rob > On 27 Sep 2006, at 14:48, Manuel Teira wrote: > >> Hello. >> >> Looking at the code in DestinationBridge >> (org.apache.activemq.network.jms), I see that when the deliver of a >> message to the remote broker fails, there's a counter implemented as >> the var 'attempt' that seems to be thought to mark fails and try to >> restart the producer. >> >> But, shouldn't that variable be a member of the DestinationBridge >> class instead of a local variable of the onMessage member method? In >> this way, the var is always initialized to zero for every onMessage >> call. So, restartProducer is never called: >> >> public void onMessage(Message message) { >> if (started.get() && message != null) { >> int attempt = 0; >> try { >> if (attempt > 0) { >> restartProducer(); >> } >> ... >> >> In my tests, I've tryed changing the var 'attempt' to be an object >> member. Now, restartProducer() is called but it seems that the new >> connection is not being used. Looking at the code, I don't understand >> how calling >> >> jmsConnector.restartProducerConnection() >> >> >> is really changing the environment of >> >> createProducer() >> >> in the QueueBridge subclass. >> >> For example, for the QueueBridge subclass, createProducer is using >> the member producerConnection: >> >> protected MessageProducer createProducer() throws JMSException{ >> >> producerSession=producerConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); >> >> producer = producerSession.createSender(null); >> return producer; >> } >> >> but I think that this is not related anymore with the >> JmsQueueConnector outboundQueueConnection, that is the only affected >> member in jmsConnector.restartProducerConnection(). I don't ever know >> how or who, in the initialization is setting up the QueueBridge >> producerConnection member, calling, I suppose, setProducerConnection. >> I think that a solution to this should be to be able to change the >> producerConnection of QueueBridge when we are restarting the >> Producer. But for that, we should need to implement restartProducer >> in the DestinationBridge subclasses. >> >> >> Any hint or idea? I really need to have remote bridge reconnections >> working urgently, so please, if you need further info, make me know. >> >> Regards. >> >> > > --Boundary_(ID_AbN9XBPPG5iGO9jSzL94MQ) Content-type: text/plain; name=reconnect-bridge.patch Content-transfer-encoding: 8BIT Content-disposition: inline; filename=reconnect-bridge.patch Index: src/main/java/org/apache/activemq/network/jms/QueueBridge.java =================================================================== --- src/main/java/org/apache/activemq/network/jms/QueueBridge.java (revisi�n: 450397) +++ src/main/java/org/apache/activemq/network/jms/QueueBridge.java (copia de trabajo) @@ -161,5 +161,17 @@ protected Connection getConnectionForProducer(){ return getProducerConnection(); } - + + protected void setConnectionForConsumer(Connection consumerConnection){ + if (consumerConnection instanceof QueueConnection) { + this.consumerConnection = (QueueConnection)consumerConnection; + } + } + + protected void setConnectionForProducer(Connection producerConnection){ + if (producerConnection instanceof QueueConnection) { + this.producerConnection = (QueueConnection)producerConnection; + } + } Index: src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java =================================================================== --- src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java (revisi�n: 450397) +++ src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java (copia de trabajo) @@ -188,9 +188,11 @@ } - public void restartProducerConnection() throws NamingException, JMSException { + public Connection restartProducerConnection() throws NamingException, JMSException { outboundTopicConnection = null; initializeForeignTopicConnection(); + return outboundTopicConnection; } protected void initializeForeignTopicConnection() throws NamingException,JMSException{ Index: src/main/java/org/apache/activemq/network/jms/JmsConnector.java =================================================================== --- src/main/java/org/apache/activemq/network/jms/JmsConnector.java (revisi�n: 450397) +++ src/main/java/org/apache/activemq/network/jms/JmsConnector.java (copia de trabajo) @@ -319,4 +319,5 @@ this.name = name; } - public abstract void restartProducerConnection() throws NamingException, JMSException; + public abstract Connection restartProducerConnection() throws NamingException, JMSException; Index: src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java =================================================================== --- src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java (revisi�n: 450397) +++ src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java (copia de trabajo) @@ -186,9 +186,11 @@ this.outboundQueueConnectionFactory=foreignQueueConnectionFactory; } - public void restartProducerConnection() throws NamingException, JMSException { + public Connection restartProducerConnection() throws NamingException, JMSException { outboundQueueConnection = null; initializeForeignQueueConnection(); + return outboundQueueConnection; } protected void initializeForeignQueueConnection() throws NamingException,JMSException{ Index: src/main/java/org/apache/activemq/network/jms/TopicBridge.java =================================================================== --- src/main/java/org/apache/activemq/network/jms/TopicBridge.java (revisi�n: 450397) +++ src/main/java/org/apache/activemq/network/jms/TopicBridge.java (copia de trabajo) @@ -186,3 +186,16 @@ protected Connection getConnectionForProducer(){ return getProducerConnection(); } + + protected void setConnectionForConsumer(Connection consumerConnection){ + if (consumerConnection instanceof TopicConnection) { + this.consumerConnection = (TopicConnection)consumerConnection; + } + } + + protected void setConnectionForProducer(Connection producerConnection){ + if (producerConnection instanceof TopicConnection) { + this.producerConnection = (TopicConnection)producerConnection; + } + } Index: src/main/java/org/apache/activemq/network/jms/DestinationBridge.java =================================================================== --- src/main/java/org/apache/activemq/network/jms/DestinationBridge.java (revisi�n: 450397) +++ src/main/java/org/apache/activemq/network/jms/DestinationBridge.java (copia de trabajo) @@ -45,6 +45,7 @@ protected boolean doHandleReplyTo = true; protected JmsConnector jmsConnector; private int maximumRetries = 10; + private int attempt = 0; /** * @return Returns the consumer. @@ -112,7 +113,6 @@ public void onMessage(Message message) { if (started.get() && message != null) { - int attempt = 0; try { if (attempt > 0) { restartProducer(); @@ -132,6 +132,7 @@ converted = jmsMessageConvertor.convert(message); } sendMessage(converted); + attempt = 0; message.acknowledge(); } catch (Exception e) { @@ -173,6 +174,10 @@ protected abstract Connection getConnectionForProducer(); + protected abstract void setConnectionForConsumer(Connection consumerConnection); + + protected abstract void setConnectionForProducer(Connection producerConnection); + protected void restartProducer() throws JMSException, NamingException { try { getConnectionForProducer().close(); @@ -180,6 +185,7 @@ catch (Exception e) { log.debug("Ignoring failure to close producer connection: " + e, e); } - jmsConnector.restartProducerConnection(); + setConnectionForProducer(jmsConnector.restartProducerConnection()); createProducer(); } --Boundary_(ID_AbN9XBPPG5iGO9jSzL94MQ)--