activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rob Davies <rajdav...@gmail.com>
Subject Re: QueueBridge and remote broker reconnections
Date Wed, 27 Sep 2006 20:53:55 GMT
Hi Manuel,

thanks for the work so far - haven't had a chance to go through the  
code yet (been a while since I visited it) - but I will first thing  
tomorrow

cheers,

Rob
On 27 Sep 2006, at 16:35, Manuel Teira wrote:

> I'm sad to confirm this behaviour with the last changes:
>
> 1.-Start the remote broker.
> 2.-Start the activemq broker with a queued bridge.
> 3.-Send a message to the bridged queue: The message is bridged  
> correctly.
> 4.-Stop the remote broker.
> 5.-Send a message to the bridged queue. It fails on  
> QueueBridge.sendMessage as the producer is closed.
> 6.-Send a new message to the bridged queue. After the last changes,  
> it tries to call 'restartProducer' but it fails, because the remote  
> broker is down.
> 7.-Start the remote broker.
> 8.-Send a new message to the bridged queue. restartProducer is  
> called again, the producer and its connection are successfully  
> recreated. But ONLY the new message reaches the remote broker. I  
> don't see any attempt to send the old ones. In the JMX console, I  
> can see, for this queue:
>
> ConsumerCount: 1
> DequeueCount: 4
> EnqueueCount: 4
> QueueSize: 0
>
> But only two messages were delivered  to the remote broker.
>
>
> What could happen with those messages? Any idea about what classes  
> to check?
>
> Regards.
>
>
>
> Manuel Teira escribió:
>> Rob Davies escribió:
>>> Hi Manuel,
>>>
>>> this looks like a good catch! Would mind opening a jira on this -  
>>> just so it's easier to track - I'll look at this as soon as I can
>>>
>>> cheers,
>>
>> Thanks. Rob. I've been experimenting further, and have made some  
>> changes to allow a DestinationBridge to get its Connections  
>> changed. I've created the new abstract methods:
>>
>> protected abstract void setConnectionForConsumer(Connection  
>> consumerConnection);
>>
>> protected abstract void setConnectionForProducer(Connection  
>> producerConnection);
>>
>> And implemented them into the subclasses QueueBridge and  
>> TopicBridge, just checking the casting to the right  
>> java.jms.Connection subclasses.
>>
>> Also, I've changed the abstract method restartProducerConnection  
>> of JmsConnector to make it return the new connection and so, be  
>> able to inject it back into the DestinationBridge making use of  
>> the new setter methods.
>>
>> Also a pair of changes in DestinationBridge onMessage method to  
>> manage correctly (I expect) the (now attribute) variable 'attempt'.
>>
>> I don't know if this is the path to follow, but it seems to work  
>> fine. Anyway, I think that only *new messages* are being sent to  
>> the remote bridged broker, and no the ones that were first tried  
>> during the remote broker failure.
>>
>>
>> Regards.
>>
>>
>>
>>
>>
>>>
>>> Rob
>>> On 27 Sep 2006, at 14:48, Manuel Teira wrote:
>>>
>>>> Hello.
>>>>
>>>> Looking at the code in DestinationBridge  
>>>> (org.apache.activemq.network.jms), I see that when the deliver  
>>>> of a message to the remote broker fails, there's a counter  
>>>> implemented as the var 'attempt' that seems to be thought to  
>>>> mark fails and try to restart the producer.
>>>>
>>>> But, shouldn't that variable be a member of the  
>>>> DestinationBridge class instead of a local variable of the  
>>>> onMessage member method? In this way, the var is always  
>>>> initialized to zero for every onMessage call. So,  
>>>> restartProducer is never called:
>>>>
>>>>     public void onMessage(Message message) {
>>>>         if (started.get() && message != null) {
>>>>             int attempt = 0;
>>>>             try {
>>>>                 if (attempt > 0) {
>>>>                     restartProducer();
>>>>                 }
>>>> ...
>>>>
>>>> In my tests, I've tryed changing the var 'attempt' to be an  
>>>> object member. Now, restartProducer() is called but it seems  
>>>> that the new connection is not being used. Looking at the code,  
>>>> I don't understand how calling
>>>>
>>>> jmsConnector.restartProducerConnection()
>>>>
>>>>
>>>> is really changing the environment of
>>>>
>>>> createProducer()
>>>>
>>>> in the QueueBridge subclass.
>>>>
>>>> For example, for the QueueBridge subclass, createProducer is  
>>>> using the member producerConnection:
>>>>
>>>>     protected MessageProducer createProducer() throws JMSException{
>>>>         producerSession=producerConnection.createQueueSession 
>>>> (false,Session.AUTO_ACKNOWLEDGE);
>>>>         producer = producerSession.createSender(null);
>>>>         return producer;
>>>>     }
>>>>
>>>> but I think that this is not related anymore with the  
>>>> JmsQueueConnector outboundQueueConnection, that is the only  
>>>> affected member in jmsConnector.restartProducerConnection(). I  
>>>> don't ever know how or who, in the initialization is setting up  
>>>> the QueueBridge producerConnection member, calling, I suppose,  
>>>> setProducerConnection.
>>>> I think that a solution to this should be to be able to change  
>>>> the producerConnection of QueueBridge when we are restarting the  
>>>> Producer. But for that, we should need to implement   
>>>> restartProducer in the DestinationBridge subclasses.
>>>>
>>>>
>>>> Any hint or idea? I really need to have remote bridge  
>>>> reconnections working urgently, so please, if you need further  
>>>> info, make me know.
>>>>
>>>> Regards.
>>>>
>>>>
>>>
>>>
>>
>>
>> Index: src/main/java/org/apache/activemq/network/jms/QueueBridge.java
>> ===================================================================
>> --- src/main/java/org/apache/activemq/network/jms/QueueBridge.java	 
>> (revisi¢n: 450397)
>> +++ src/main/java/org/apache/activemq/network/jms/QueueBridge.java	 
>> (copia de trabajo)
>> @@ -161,5 +161,17 @@
>>      protected Connection getConnectionForProducer(){
>>          return getProducerConnection();
>>      }
>> -
>> +
>> +    protected void setConnectionForConsumer(Connection  
>> consumerConnection){
>> +        if (consumerConnection instanceof QueueConnection) {
>> +            this.consumerConnection = (QueueConnection) 
>> consumerConnection;
>> +        }
>> +    }
>> +
>> +    protected void setConnectionForProducer(Connection  
>> producerConnection){
>> +        if (producerConnection instanceof QueueConnection) {
>> +            this.producerConnection = (QueueConnection) 
>> producerConnection;
>> +        }
>> +    }
>>
>> Index: src/main/java/org/apache/activemq/network/jms/ 
>> JmsTopicConnector.java
>> ===================================================================
>> --- src/main/java/org/apache/activemq/network/jms/ 
>> JmsTopicConnector.java	(revisi¢n: 450397)
>> +++ src/main/java/org/apache/activemq/network/jms/ 
>> JmsTopicConnector.java	(copia de trabajo)
>> @@ -188,9 +188,11 @@
>>      }
>>
>>
>> -    public void restartProducerConnection() throws  
>> NamingException, JMSException {
>> +    public Connection restartProducerConnection() throws  
>> NamingException, JMSException {
>>          outboundTopicConnection = null;
>>          initializeForeignTopicConnection();
>> +        return outboundTopicConnection;
>>      }
>>
>>      protected void initializeForeignTopicConnection() throws  
>> NamingException,JMSException{
>> Index: src/main/java/org/apache/activemq/network/jms/ 
>> JmsConnector.java
>> ===================================================================
>> --- src/main/java/org/apache/activemq/network/jms/ 
>> JmsConnector.java	(revisi¢n: 450397)
>> +++ src/main/java/org/apache/activemq/network/jms/ 
>> JmsConnector.java	(copia de trabajo)
>> @@ -319,4 +319,5 @@
>>          this.name = name;
>>      }
>>
>> -    public abstract void restartProducerConnection() throws  
>> NamingException, JMSException;
>> +    public abstract Connection restartProducerConnection() throws  
>> NamingException, JMSException;
>> Index: src/main/java/org/apache/activemq/network/jms/ 
>> JmsQueueConnector.java
>> ===================================================================
>> --- src/main/java/org/apache/activemq/network/jms/ 
>> JmsQueueConnector.java	(revisi¢n: 450397)
>> +++ src/main/java/org/apache/activemq/network/jms/ 
>> JmsQueueConnector.java	(copia de trabajo)
>> @@ -186,9 +186,11 @@
>>           
>> this.outboundQueueConnectionFactory=foreignQueueConnectionFactory;
>>      }
>>
>> -    public void restartProducerConnection() throws  
>> NamingException, JMSException {
>> +    public Connection restartProducerConnection() throws  
>> NamingException, JMSException {
>>          outboundQueueConnection = null;
>>          initializeForeignQueueConnection();
>> +        return outboundQueueConnection;
>>      }
>>
>>      protected void initializeForeignQueueConnection() throws  
>> NamingException,JMSException{
>> Index: src/main/java/org/apache/activemq/network/jms/TopicBridge.java
>> ===================================================================
>> --- src/main/java/org/apache/activemq/network/jms/TopicBridge.java	 
>> (revisi¢n: 450397)
>> +++ src/main/java/org/apache/activemq/network/jms/TopicBridge.java	 
>> (copia de trabajo)
>> @@ -186,3 +186,16 @@
>>      protected Connection getConnectionForProducer(){
>>          return getProducerConnection();
>>      }
>> +
>> +    protected void setConnectionForConsumer(Connection  
>> consumerConnection){
>> +        if (consumerConnection instanceof TopicConnection) {
>> +            this.consumerConnection = (TopicConnection) 
>> consumerConnection;
>> +        }
>> +    }
>> +
>> +    protected void setConnectionForProducer(Connection  
>> producerConnection){
>> +        if (producerConnection instanceof TopicConnection) {
>> +            this.producerConnection = (TopicConnection) 
>> producerConnection;
>> +        }
>> +    }
>> Index: src/main/java/org/apache/activemq/network/jms/ 
>> DestinationBridge.java
>> ===================================================================
>> --- src/main/java/org/apache/activemq/network/jms/ 
>> DestinationBridge.java	(revisi¢n: 450397)
>> +++ src/main/java/org/apache/activemq/network/jms/ 
>> DestinationBridge.java	(copia de trabajo)
>> @@ -45,6 +45,7 @@
>>      protected boolean doHandleReplyTo = true;
>>      protected JmsConnector jmsConnector;
>>      private int maximumRetries = 10;
>> +    private int attempt = 0;
>>
>>      /**
>>       * @return Returns the consumer.
>> @@ -112,7 +113,6 @@
>>
>>      public void onMessage(Message message) {
>>          if (started.get() && message != null) {
>> -            int attempt = 0;
>>              try {
>>                  if (attempt > 0) {
>>                      restartProducer();
>> @@ -132,6 +132,7 @@
>>                      converted = jmsMessageConvertor.convert 
>> (message);
>>                  }
>>                  sendMessage(converted);
>> +                attempt = 0;
>>                  message.acknowledge();
>>              }
>>              catch (Exception e) {
>> @@ -173,6 +174,10 @@
>>
>>      protected abstract Connection getConnectionForProducer();
>>
>> +    protected abstract void setConnectionForConsumer(Connection  
>> consumerConnection);
>> +
>> +    protected abstract void setConnectionForProducer(Connection  
>> producerConnection);
>> +
>>      protected void restartProducer() throws JMSException,  
>> NamingException {
>>          try {
>>              getConnectionForProducer().close();
>> @@ -180,6 +185,7 @@
>>          catch (Exception e) {
>>              log.debug("Ignoring failure to close producer  
>> connection: " + e, e);
>>          }
>> -        jmsConnector.restartProducerConnection();
>> +        setConnectionForProducer 
>> (jmsConnector.restartProducerConnection());
>>          createProducer();
>>      }
>>
>


Mime
View raw message