storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bobby Evans <bo...@apache.org>
Subject Re: Disruptor Queue flush behaviour
Date Fri, 14 Dec 2018 15:10:50 GMT
Sorry I was out of town at a conference.

So inside the receive queue each entry may be a batch of tuples, but inside
the send queue each entry is an individual tuple.  Yes this makes if very
difficult to model, but in our testing removing the batching had a negative
performance impact. Because of this consumeBatchWhenAvailable will loop
through up to 1024 entries, but in some cases those entries may be more
than one tuple.

Thanks,

Bobby

On Wed, Dec 12, 2018 at 12:39 PM Roshan Naik <roshan_naik@yahoo.com> wrote:

>  There is a detailed diagram of both the 1.x and 2.x messaging internals
> ...  starting at slide  11 over here:
>
> https://www.slideshare.net/Hadoop_Summit/next-generation-execution-for-apache-storm
>
> The flusher threads shown in 1.x there should be in a thread pool .. as
> Bobby described earlier.
>
> Recording of a bit older version of the talk associated with those slides
> is here.
> https://www.youtube.com/watch?v=bZPpt4NnvsA&t=2s
>
> May answer some of your questions.
> -roshan
>
>
>
>
>
>
>
>
> On Wednesday, December 12, 2018, 4:18:08 AM PST, Thomas Cooper (PGR) <
> t.cooper@newcastle.ac.uk> wrote:
>
>
> Bobby,
>
>
> Thank you for such a detailed reply. It was really useful. I was wondering
> if you could confirm a few final details for me? (This is for Storm version
> 1.2.2)
>
>
> *1) *
> My first question is about the worker process local transfer function (defined in
> https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/worker.clj#L116),
> that is used for both moving tuples off the executor send thread to
> internal executor receive queues and also for moving batches of tuples
> arriving from other workers onto the relevant receive queues. The local
> transfer function groups all AddressedTuples for a given executor into a
> list (called pairs in the code) that is then given to the
> DisruptoQueue's publish function (via a Clojure wrapper).
>
>
> My question is, because my knowledge of Clojure is limited, does the local
> transfer function (at
> https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/worker.clj#L125)
> pass individual tuples from the pairs list to the publish function (via
> some kind of map operation) or does it pass the list of tuples in one go?
>
>
> I ask because I am trying to model the tuple flow and so I need to work
> out if batches in the DisruptorQueue's overflow queue are batches of
> individual tuples or batches of batches (lists) of tuples (which
> complicates things).
>
>
> The executor's event handler (
> https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L457),
which
> is called on a batch of objects returned from the ring buffer, seems to be
> expecting a list of individual tuples, not a list of lists of tuples.
> Therefore it seems like the former situation described above (where the
> local transfer function gives individual tuples to the publish function) is
> likely, but I just wanted to be sure.
>
>
> *2) *My second question is about the DistruptorQueue consumeBatchWhenAvailable
> method (
> https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L481).
> I just wanted to confirm that the "batch" which this method refers to is
> actually "every available object in the ring buffer at that time".
> Therefore the batched consumed via this method could be of any size from 1
> up to the ring buffer's limit (1024 by default)?
>
>
> Thanks again for helping with my questions.
>
>
> Regards,
>
>
> Thomas Cooper
> PhD Student
> Newcastle University, School of Computer Science
> W: http://www.tomcooper.org.uk | Twitter: @tomncooper
> <https://twitter.com/tomncooper>
>
>
> ------------------------------
> *From:* Bobby Evans <bobby@apache.org>
> *Sent:* 27 November 2018 15:56
> *To:* user
> *Subject:* Re: Disruptor Queue flush behaviour
>
> FYI in 2.x all of this is different, but to answer your questions for 1.x.
>
> It is a little complicated to try and keep the memory and CPU overhead
> low, especially when few tuples are flowing.  Conceptually what happens is
> that tuples are placed into a separate data structure when they are
> inserted.
>
>
> https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L225
>
> If that batch fills up it will attempt to insert them into the disruptor
> queue.  Inserting multiple messages into the queue is more efficient than
> inserting in a single message at a time.  If there is not enough capacity
> to insert the message it goes into an overflow queue.
>
> Every millisecond there is a thread pool that will then work at flushing
> all of the tuples buffered in the entire JVM.  First it will force any
> outstanding tuples to be placed into the overflow queue.
>
>
> https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L268-L273
>
> After that it goes through that overflow queue and makes sure all of the
> tuples from overflow are flushed into the disruptor queue.  So if no tuples
> are flowing a single thread in the thread pool will wake up once a ms to do
> a few checks per queue and end up doing nothing.  If messages are flowing
> once a ms a partial batch is inserted into the disruptor queue, and
> depending on how long it takes to insert those messages into the queue
> there may be a few threads doing this.
>
> I hope this helps,
>
> Bobby
>
>
> On Mon, Nov 26, 2018 at 7:03 AM Thomas Cooper (PGR) <
> t.cooper@newcastle.ac.uk> wrote:
>
> Hi,
>
>
> I have a question about the behaviour of the LMAX disruptor queues that
> the executor send/receive and the worker transfer queues use.
>
>
> These queues batch tuples for processing (100 by default) and will wait
> until a full batch has arrived before passing them to the executor.
> However, they will also flush any tuples in the queue periodically (1 ms by
> default) to prevent the queue blocking for a long time while it waits for
> 100 tuples to turn up.
>
>
> My question is about the implementation of the flush interval behaviour:
>
>
>
>    1. Does the flush interval thread run continuously, issuing a flush
>    command every 1 ms and the queue just ignores it if it is already
>    flushing. If 100 tuples turn up between the constant flush commands
>    the queue issues them straight away.
>    2. Or does the flush interval timer only start when
>    consumeBatchWhenAvailable is called on the disruptor queue and a full
>    batch is not available? In which case the queue will wait for 1ms and
>    return whatever is in the queue at the end of that interval or, if 100
>    tuples turn up within that 1ms, return the full batch.
>
>
> From the code in storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
> it seems option 1 might be the case. However, the code in that class is
> quite complex and the interplay with the underlying LMAX library makes it
> hard to reason about.
>
>
> Any help with the above would greatly appreciated, I am attempting to
> model the effect of these queues on topology performance and hopefully
> investigate a way to optimise the choice of batch size and flush interval.
> <http://localhost:7080/github.com/apache/storm@v1.2.2/-/blob/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java?utm_source=share#L300:16>
>
>
> Thanks,
>
>
> Thomas Cooper
> PhD Student
> Newcastle University, School of Computing
> W: http://www.tomcooper.org.uk | Twitter: @tomncooper
> <https://twitter.com/tomncooper>
>
>

Mime
View raw message