activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Bain <tb...@alumni.duke.edu>
Subject Fwd: Messages in session queue delivered before messages enqueued in consumers
Date Thu, 20 Jul 2017 23:51:49 GMT
Dev list, could someone please look at Fabian's email from the user list,
and review his questions and his proposed patch to see if his changes
address the issue properly without introducing unwanted side effects?

Thanks,
Tim
---------- Forwarded message ----------
From: "Fabian Gonzalez" <fabian.gonzalez@mulesoft.com>
Date: Jul 20, 2017 10:13 AM
Subject: Messages in session queue delivered before messages enqueued in
consumers
To: <users@activemq.apache.org>
Cc:

Hello, all
>
> I have opened a ticket (AMQ-6775
> <https://issues.apache.org/jira/browse/AMQ-6775>) to verify an issue which
> I explain below. I have added a possible fix,
>
> https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd
> 9a79a183bc
>
> but I would like to know if I am correctly understanding the logic:
>
> I found this situacion in
> org.apache.activemq.ActiveMQSessionExecutor.iterate:
> As I understand, the idea is that if there are messages queued on the
> consumers they are delivered to the listeners and if that is not the case,
> you should dispatch the messages queued in the session:
>     public boolean iterate() {
>
>         // Deliver any messages queued on the consumer to their listeners.
>         for (ActiveMQMessageConsumer consumer : this.session.consumers) {
>             if (consumer.iterate()) {
>                 return true;
>             }
>         }
>
>         // No messages left queued on the listeners.. so now dispatch
> messages
>         // queued on the session
>         MessageDispatch message = messageQueue.dequeueNoWait();
>         if (message == null) {
>             return false;
>         } else {
>             dispatch(message);
>             return !messageQueue.isEmpty();
>         }
>     }
> Now the following race condition arises:
> 1) thread A (ActiveMQ Session Task) invokes .ActiveMQSessionExecutor
> 2) When this part of the code is executed:
>         for (ActiveMQMessageConsumer consumer : this.session.consumers) {
>             if (consumer.iterate()) {
>                 return true;
>             }
>         }
> ActiveMQMessageConsumer.iterate is invoked. There are messages unconsumed
> in the consumer, but as unconsumedMessages is not started, a null is
> returned as if there were no messages queued.
> 3) Thread A is interrupted
> 4) Thread B (ActiveMQConnection[xx]Scheduler) invokes
> ActiveMQMessageConsumer.start, unconsumed messages are started.
> 5) Thread A continues to deliver the messages queued in the session (notice
> that if thread B starts unconsumed messages before thread A which happens
> most of times the messages in the consumer queue would have been
> dispatched).
> 6) Thread A dispatch a message from the session when there are messages
> from the consumer pending.
> This race condition makes that in some cases a message which has been
> rollbacked and get queued back in the consumer is redelivered after another
> message in the session consumer (which was enqueued after the former
> message).
> Maybe the following commit in my fork would fix the issue, adding a further
> verification to make sure that there are no messages queued in the
> consumers before deliver the messages in the session:
> https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd
> 9a79a183bc
>
> Thanks in advance for any comment,
>
> Fabian
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message