activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Gonzalez <fabian.gonza...@mulesoft.com>
Subject Re: Messages in session queue delivered before messages enqueued in consumers
Date Fri, 21 Jul 2017 12:42:17 GMT
Thank you very much, Tim!

On Thu, Jul 20, 2017 at 11:20 PM, Tim Bain <tbain@alumni.duke.edu> wrote:

> I forwarded this to the developer mailing list (
> http://activemq.2283324.n4.nabble.com/Fwd-Messages-in-
> session-queue-delivered-before-messages-enqueued-in-
> consumers-td4728758.html),
> so between this thread, that thread, and the JIRA entry and pull request,
> hopefully you'll get the feedback you're hoping for. But I can't say for
> sure which place you'll get responses, so watch them all.
>
> Tim
>
> On Jul 20, 2017 10:13 AM, "Fabian Gonzalez" <fabian.gonzalez@mulesoft.com>
> wrote:
>
> Hello, all
>
> I have opened a ticket (AMQ-6775
> <https://issues.apache.org/jira/browse/AMQ-6775>) to verify an issue which
> I explain below. I have added a possible fix,
>
> https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd
> 9a79a183bc
>
> but I would like to know if I am correctly understanding the logic:
>
> I found this situacion in
> org.apache.activemq.ActiveMQSessionExecutor.iterate:
> As I understand, the idea is that if there are messages queued on the
> consumers they are delivered to the listeners and if that is not the case,
> you should dispatch the messages queued in the session:
>     public boolean iterate() {
>
>         // Deliver any messages queued on the consumer to their listeners.
>         for (ActiveMQMessageConsumer consumer : this.session.consumers) {
>             if (consumer.iterate()) {
>                 return true;
>             }
>         }
>
>         // No messages left queued on the listeners.. so now dispatch
> messages
>         // queued on the session
>         MessageDispatch message = messageQueue.dequeueNoWait();
>         if (message == null) {
>             return false;
>         } else {
>             dispatch(message);
>             return !messageQueue.isEmpty();
>         }
>     }
> Now the following race condition arises:
> 1) thread A (ActiveMQ Session Task) invokes .ActiveMQSessionExecutor
> 2) When this part of the code is executed:
>         for (ActiveMQMessageConsumer consumer : this.session.consumers) {
>             if (consumer.iterate()) {
>                 return true;
>             }
>         }
> ActiveMQMessageConsumer.iterate is invoked. There are messages unconsumed
> in the consumer, but as unconsumedMessages is not started, a null is
> returned as if there were no messages queued.
> 3) Thread A is interrupted
> 4) Thread B (ActiveMQConnection[xx]Scheduler) invokes
> ActiveMQMessageConsumer.start, unconsumed messages are started.
> 5) Thread A continues to deliver the messages queued in the session (notice
> that if thread B starts unconsumed messages before thread A which happens
> most of times the messages in the consumer queue would have been
> dispatched).
> 6) Thread A dispatch a message from the session when there are messages
> from the consumer pending.
> This race condition makes that in some cases a message which has been
> rollbacked and get queued back in the consumer is redelivered after another
> message in the session consumer (which was enqueued after the former
> message).
> Maybe the following commit in my fork would fix the issue, adding a further
> verification to make sure that there are no messages queued in the
> consumers before deliver the messages in the session:
> https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd
> 9a79a183bc
>
> Thanks in advance for any comment,
>
> Fabian
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message