activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Manuel Teira <mte...@tid.es>
Subject Re: QueueBridge and remote broker reconnections
Date Wed, 27 Sep 2006 15:35:48 GMT
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
  <meta content="text/html;charset=ISO-8859-1" http-equiv="Content-Type">
</head>
<body bgcolor="#ffffff" text="#000000">
I'm sad to confirm this behaviour with the last changes:<br>
<br>
1.-Start the remote broker.<br>
2.-Start the activemq broker with a queued bridge.<br>
3.-Send a message to the bridged queue: The message is bridged
correctly.<br>
4.-Stop the remote broker.<br>
5.-Send a message to the bridged queue. It fails on
QueueBridge.sendMessage as the producer is closed.<br>
6.-Send a new message to the bridged queue. After the last changes, it
tries to call 'restartProducer' but it fails, because the remote broker
is down.<br>
7.-Start the remote broker.<br>
8.-Send a new message to the bridged queue. restartProducer is called
again, the producer and its connection are successfully recreated. But
ONLY the new message reaches the remote broker. I don't see any attempt
to send the old ones. In the JMX console, I can see, for this queue:<br>
<br>
ConsumerCount: 1<br>
DequeueCount: 4<br>
EnqueueCount: 4<br>
QueueSize: 0<br>
<br>
But only two messages were delivered&nbsp; to the remote broker.<br>
<br>
<br>
What could happen with those messages? Any idea about what classes to
check?<br>
<br>
Regards.<br>
<br>
<br>
<br>
Manuel Teira escribi&oacute;:
<blockquote cite="mid451A93FA.6080807@tid.es" type="cite">Rob Davies
escribi&oacute;:
  <br>
  <blockquote type="cite">Hi Manuel,
    <br>
    <br>
this looks like a good catch! Would mind opening a jira on this - just
so it's easier to track - I'll look at this as soon as I can
    <br>
    <br>
cheers,
    <br>
  </blockquote>
  <br>
Thanks. Rob. I've been experimenting further, and have made some
changes to allow a DestinationBridge to get its Connections changed.
I've created the new abstract methods:
  <br>
  <br>
protected abstract void setConnectionForConsumer(Connection
consumerConnection);
  <br>
  <br>
protected abstract void setConnectionForProducer(Connection
producerConnection);
  <br>
  <br>
And implemented them into the subclasses QueueBridge and TopicBridge,
just checking the casting to the right java.jms.Connection subclasses.
  <br>
  <br>
Also, I've changed the abstract method restartProducerConnection of
JmsConnector to make it return the new connection and so, be able to
inject it back into the DestinationBridge making use of the new setter
methods.
  <br>
  <br>
Also a pair of changes in DestinationBridge onMessage method to manage
correctly (I expect) the (now attribute) variable 'attempt'.
  <br>
  <br>
I don't know if this is the path to follow, but it seems to work fine.
Anyway, I think that only *new messages* are being sent to the remote
bridged broker, and no the ones that were first tried during the remote
broker failure.
  <br>
  <br>
  <br>
Regards.
  <br>
  <br>
  <br>
  <br>
  <br>
  <br>
  <blockquote type="cite"><br>
Rob
    <br>
On 27 Sep 2006, at 14:48, Manuel Teira wrote:
    <br>
    <br>
    <blockquote type="cite">Hello.
      <br>
      <br>
Looking at the code in DestinationBridge
(org.apache.activemq.network.jms), I see that when the deliver of a
message to the remote broker fails, there's a counter implemented as
the var 'attempt' that seems to be thought to mark fails and try to
restart the producer.
      <br>
      <br>
But, shouldn't that variable be a member of the DestinationBridge class
instead of a local variable of the onMessage member method? In this
way, the var is always initialized to zero for every onMessage call.
So, restartProducer is never called:
      <br>
      <br>
&nbsp;&nbsp;&nbsp; public void onMessage(Message message) {
      <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (started.get() &amp;&amp;
message != null) {
      <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
int attempt = 0;
      <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
try {
      <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
if (attempt &gt; 0) {
      <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
restartProducer();
      <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
}
      <br>
...
      <br>
      <br>
In my tests, I've tryed changing the var 'attempt' to be an object
member. Now, restartProducer() is called but it seems that the new
connection is not being used. Looking at the code, I don't understand
how calling
      <br>
      <br>
jmsConnector.restartProducerConnection()
      <br>
      <br>
      <br>
is really changing the environment of
      <br>
      <br>
createProducer()
      <br>
      <br>
in the QueueBridge subclass.
      <br>
      <br>
For example, for the QueueBridge subclass, createProducer is using the
member producerConnection:
      <br>
      <br>
&nbsp;&nbsp;&nbsp; protected MessageProducer createProducer() throws JMSException{
      <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
producerSession=producerConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
      <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; producer = producerSession.createSender(null);
      <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return producer;
      <br>
&nbsp;&nbsp;&nbsp; }
      <br>
      <br>
but I think that this is not related anymore with the JmsQueueConnector
outboundQueueConnection, that is the only affected member in
jmsConnector.restartProducerConnection(). I don't ever know how or who,
in the initialization is setting up the QueueBridge producerConnection
member, calling, I suppose, setProducerConnection.
      <br>
I think that a solution to this should be to be able to change the
producerConnection of QueueBridge when we are restarting the Producer.
But for that, we should need to implement&nbsp; restartProducer in the
DestinationBridge subclasses.
      <br>
      <br>
      <br>
Any hint or idea? I really need to have remote bridge reconnections
working urgently, so please, if you need further info, make me know.
      <br>
      <br>
Regards.
      <br>
      <br>
      <br>
    </blockquote>
    <br>
    <br>
  </blockquote>
  <br>
  <pre wrap="">
<hr size="4" width="90%">
Index: src/main/java/org/apache/activemq/network/jms/QueueBridge.java
===================================================================
--- src/main/java/org/apache/activemq/network/jms/QueueBridge.java	(revisi&cent;n: 450397)
+++ src/main/java/org/apache/activemq/network/jms/QueueBridge.java	(copia de trabajo)
@@ -161,5 +161,17 @@
     protected Connection getConnectionForProducer(){
         return getProducerConnection();
     }
-    
+
+    protected void setConnectionForConsumer(Connection consumerConnection){
+        if (consumerConnection instanceof QueueConnection) {
+            this.consumerConnection = (QueueConnection)consumerConnection;
+        }
+    }
+
+    protected void setConnectionForProducer(Connection producerConnection){
+        if (producerConnection instanceof QueueConnection) {
+            this.producerConnection = (QueueConnection)producerConnection;
+        }
+    }
   
Index: src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
===================================================================
--- src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java	(revisi&cent;n:
450397)
+++ src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java	(copia de trabajo)
@@ -188,9 +188,11 @@
     }
 
 
-    public void restartProducerConnection() throws NamingException, JMSException {
+    public Connection restartProducerConnection() throws NamingException, JMSException {
         outboundTopicConnection = null;
         initializeForeignTopicConnection();
+        return outboundTopicConnection;
     }
 
     protected void initializeForeignTopicConnection() throws NamingException,JMSException{
Index: src/main/java/org/apache/activemq/network/jms/JmsConnector.java
===================================================================
--- src/main/java/org/apache/activemq/network/jms/JmsConnector.java	(revisi&cent;n: 450397)
+++ src/main/java/org/apache/activemq/network/jms/JmsConnector.java	(copia de trabajo)
@@ -319,4 +319,5 @@
         this.name = name;
     }
 
-    public abstract void restartProducerConnection() throws NamingException, JMSException;
+    public abstract Connection restartProducerConnection() throws NamingException, JMSException;
Index: src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
===================================================================
--- src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java	(revisi&cent;n:
450397)
+++ src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java	(copia de trabajo)
@@ -186,9 +186,11 @@
         this.outboundQueueConnectionFactory=foreignQueueConnectionFactory;
     }
 
-    public void restartProducerConnection() throws NamingException, JMSException {
+    public Connection restartProducerConnection() throws NamingException, JMSException {
         outboundQueueConnection = null;
         initializeForeignQueueConnection();
+        return outboundQueueConnection;
     }
 
     protected void initializeForeignQueueConnection() throws NamingException,JMSException{
Index: src/main/java/org/apache/activemq/network/jms/TopicBridge.java
===================================================================
--- src/main/java/org/apache/activemq/network/jms/TopicBridge.java	(revisi&cent;n: 450397)
+++ src/main/java/org/apache/activemq/network/jms/TopicBridge.java	(copia de trabajo)
@@ -186,3 +186,16 @@
     protected Connection getConnectionForProducer(){
         return getProducerConnection();
     }
+
+    protected void setConnectionForConsumer(Connection consumerConnection){
+        if (consumerConnection instanceof TopicConnection) {
+            this.consumerConnection = (TopicConnection)consumerConnection;
+        }
+    }
+
+    protected void setConnectionForProducer(Connection producerConnection){
+        if (producerConnection instanceof TopicConnection) {
+            this.producerConnection = (TopicConnection)producerConnection;
+        }
+    }
Index: src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
===================================================================
--- src/main/java/org/apache/activemq/network/jms/DestinationBridge.java	(revisi&cent;n:
450397)
+++ src/main/java/org/apache/activemq/network/jms/DestinationBridge.java	(copia de trabajo)
@@ -45,6 +45,7 @@
     protected boolean doHandleReplyTo = true;
     protected JmsConnector jmsConnector;
     private int maximumRetries = 10;
+    private int attempt = 0;
 
     /**
      * @return Returns the consumer.
@@ -112,7 +113,6 @@
 
     public void onMessage(Message message) {
         if (started.get() &amp;&amp; message != null) {
-            int attempt = 0;
             try {
                 if (attempt &gt; 0) {
                     restartProducer();
@@ -132,6 +132,7 @@
                     converted = jmsMessageConvertor.convert(message);
                 }
                 sendMessage(converted);
+                attempt = 0;
                 message.acknowledge();
             }
             catch (Exception e) {
@@ -173,6 +174,10 @@
 
     protected abstract Connection getConnectionForProducer();
 
+    protected abstract void setConnectionForConsumer(Connection consumerConnection);
+
+    protected abstract void setConnectionForProducer(Connection producerConnection);
+
     protected void restartProducer() throws JMSException, NamingException {
         try {
             getConnectionForProducer().close();
@@ -180,6 +185,7 @@
         catch (Exception e) {
             log.debug("Ignoring failure to close producer connection: " + e, e);
         }
-        jmsConnector.restartProducerConnection();
+        setConnectionForProducer(jmsConnector.restartProducerConnection());
         createProducer();
     }
  </pre>
</blockquote>
<br>
</body>
</html>

Mime
View raw message