activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leguil2007 <francois.guilleme...@trilliantnetworks.com>
Subject Re: Consumer hang after broker restart
Date Wed, 30 May 2007 14:16:49 GMT

I was able to find a workaround to make this thing work, I added a
TransportListener. When the transport is interupted, I cleanup the
connection. When the transport is resumed I recreated the session and
consumer.

  public void transportInterupted() {
    System.out.println("Transport interupted");    
    try {
      if (connection != null) {
      connection.removeTransportListener(this);
      connection.cleanup();
      connection.setExceptionListener(this);
      connection.addTransportListener(this);
    }
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

  public  void transportResumed() {
    System.out.println("Transport resumed");    
    try {
      connection.start();
      session = connection.createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
      destination = session.createQueue(subject);

      MessageConsumer consumer = session.createConsumer(destination);
      
      consumer.setMessageListener(this);
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

But I'm still thinking this is just a workaround, not the real solution.


Leguil2007 wrote:
> 
> Environment : activemq 4.1.1, on windows Xp sp2
> 
> I'm running the example:
> 1. I start the broker : ant embedBroker 
> 2. I run a producer : ant producer -DsleepTime=0 -Ddurable=true -Dmax=100
> 
> Everything ok at this time, in JMX the queue have the 100 messages
> 
> 3. I run a customer (I put the queue prefetch policy to 1 inside the code)
> : ant consumer -Dmax=0 -Durl="failover:(tcp://localhost:61616)"
> -DsleepTime=4000
> 4. I stop and restart the broker
> 5. The consumer ate 2 messages and hang :
>      [java] Received: Message: 2 sent at: Thu May 24 13:55:00 EDT 2007 
> ...
>      [java] Received: Message: 3 sent at: Thu May 24 13:55:00 EDT 2007 
> ...
>      [java] Received: Message: 4 sent at: Thu May 24 13:55:00 EDT 2007 
> ...
>      [java] Received: Message: 5 sent at: Thu May 24 13:55:00 EDT 2007 
> ...
>      [java] 13:55:32 INFO  Transport failed, attempting to automatically
> reconnect due to: java.net.SocketException: Connection reset
>      [java] java.net.SocketException: Connection reset
>      [java]     at
> java.net.SocketInputStream.read(SocketInputStream.java:168)
>      [java]     at
> org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:49)
>      [java]     at
> org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:56)
>      [java]     at
> java.io.DataInputStream.readInt(DataInputStream.java:353)
>      [java]     at
> org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267)
>      [java]     at
> org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:156)
>      [java]     at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:136)
>      [java]     at java.lang.Thread.run(Thread.java:595)
>      [java] Received: Message: 5 sent at: Thu May 24 13:55:00 EDT 2007 
> ...
>      [java] Received: Message: 6 sent at: Thu May 24 13:55:00 EDT 2007 
> ...
> 
> On JMX I got :
>    Active      true
>    Blocked    false
>    Connected true
>    DequeueCount   6
>    DispatchQueueSize 0
>    EnqueueCount 5
>    Slow   false
> 
> On the broker side, I have these traces:
>      [java] 13:55:48 INFO  ActiveMQ JMS Message Broker (localhost,
> ID:fguillemette-4075-1180029343428-1:0) started
>      [java] 13:55:48 INFO  JMX consoles can connect to
> service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
>      [java] 13:55:56 INFO  Could not correlate acknowledgment with
> dispatched message: MessageAck {commandId = 12, responseRequired = false,
> ackType = 2, consumerId = ID:fguillemette-4053-1180029310597-0:0:1:1,
> firstMessageId = null, lastMessageId =
> ID:fguillemette-4049-1180029300377-0:0:1:1:6, destination =
> queue://TEST.FOO, transactionId = null, messageCount = 1}
>      [java] 13:56:00 INFO  Could not correlate acknowledgment with
> dispatched message: MessageAck {commandId = 13, responseRequired = false,
> ackType = 2, consumerId = ID:fguillemette-4053-1180029310597-0:0:1:1,
> firstMessageId = null, lastMessageId =
> ID:fguillemette-4049-1180029300377-0:0:1:1:7, destination =
> queue://TEST.FOO, transactionId = null, messageCount = 1}
> 
> If I produce new messages, the consumer don't wake up
> If I start a new consumer, it will continue to eat from the queue without
> problems.
> 
> Is there something I do wrong?
> 
> Thank you for your response
> 
> Francois
> 

-- 
View this message in context: http://www.nabble.com/Consumer-hang-after-broker-restart-tf3811795s2354.html#a10874670
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message