storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roshan Naik <roshan_n...@yahoo.com>
Subject Re: Disruptor Queue flush behaviour
Date Wed, 12 Dec 2018 18:38:51 GMT
  There is a detailed diagram of both the 1.x and 2.x messaging internals ...  starting at
slide  11 over here: https://www.slideshare.net/Hadoop_Summit/next-generation-execution-for-apache-storm

The flusher threads shown in 1.x there should be in a thread pool .. as Bobby described earlier.
Recording of a bit older version of the talk associated with those slides is here.  https://www.youtube.com/watch?v=bZPpt4NnvsA&t=2s

May answer some of your questions.-roshan







    On Wednesday, December 12, 2018, 4:18:08 AM PST, Thomas Cooper (PGR) <t.cooper@newcastle.ac.uk>
wrote:  
 
  <!--#yiv0311628846 P {margin-top:0;margin-bottom:0;}-->
Bobby,




Thank you for such a detailed reply. It was really useful. I was wondering if you could confirm
a few final details for me? (This is for Storm version 1.2.2)




1) My first question is about the worker process local transfer function (defined in https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/worker.clj#L116),
that is used for both moving tuples off the executor send thread to internal executor receive
queues and also for moving batches of tuples arriving from other workers onto the relevant
receive queues. The local transfer function groups all AddressedTuples for a given executor
into a list (called pairs in the code) that is then given to the DisruptoQueue's publish
function (via a Clojure wrapper). 




My question is, because my knowledge of Clojure is limited, does the local transfer function
(at https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/worker.clj#L125)
pass individual tuples from the pairs list to the publish function (via some kind of map operation) or
does it pass the list of tuples in one go?




I ask because I am trying to model the tuple flow and so I need to work out if batches in
the DisruptorQueue's overflow queue are batches of individual tuples or batches of batches
(lists) of tuples (which complicates things).




The executor's event handler (https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L457), which
is called on a batch of objects returned from the ring buffer, seems to be expecting a list
of individual tuples, not a list of lists of tuples. Therefore it seems like the former situation
described above (where the local transfer function gives individual tuples to the publish
function) is likely, but I just wanted to be sure.




2) My second question is about the DistruptorQueue consumeBatchWhenAvailable method (https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L481).
I just wanted to confirm that the "batch" which this method refers to is actually "every available
object in the ring buffer at that time". Therefore the batched consumed via this method could
be of any size from 1 up to the ring buffer's limit (1024 by default)?




Thanks again for helping with my questions.




Regards,



Thomas Cooper
PhD Student
Newcastle University, School of Computer Science
W: http://www.tomcooper.org.uk | Twitter:@tomncooper


From: Bobby Evans <bobby@apache.org>
Sent: 27 November 2018 15:56
To: user
Subject: Re: Disruptor Queue flush behaviour FYI in 2.x all of this is different, but to
answer your questions for 1.x.
It is a little complicated to try and keep the memory and CPU overhead low, especially when
few tuples are flowing.  Conceptually what happens is that tuples are placed into a separate
data structure when they are inserted.

https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L225

If that batch fills up it will attempt to insert them into the disruptor queue.  Inserting
multiple messages into the queue is more efficient than inserting in a single message at a
time.  If there is not enough capacity to insert the message it goes into an overflow queue. 


Every millisecond there is a thread pool that will then work at flushing all of the tuples
buffered in the entire JVM.  First it will force any outstanding tuples to be placed into
the overflow queue.
https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L268-L273
After that it goes through that overflow queue and makes sure all of the tuples from overflow
are flushed into the disruptor queue.  So if no tuples are flowing a single thread in the
thread pool will wake up once a ms to do a few checks per queue and end up doing nothing. 
If messages are flowing once a ms a partial batch is inserted into the disruptor queue, and
depending on how long it takes to insert those messages into the queue there may be a few
threads doing this.
I hope this helps,
Bobby


On Mon, Nov 26, 2018 at 7:03 AM Thomas Cooper (PGR) <t.cooper@newcastle.ac.uk> wrote:


Hi,





I have a question about the behaviour of the LMAX disruptor queues that the executor send/receive
and the worker transfer queues use.




These queues batch tuples for processing (100 by default) and will wait until a full batch
has arrived before passing them to the executor. However, they will also flush any tuples
in the queue periodically (1 ms by default) to prevent the queue blocking for a long time
while it waits for 100 tuples to turn up.




My question is about the implementation of the flush interval behaviour:





   
   - Does the flush interval thread run continuously, issuing a flush command every 1 ms
and the queue just ignores it if it is already flushing. If 100 tuples turn up between the
constant flush commands the queue issues them straight away. 
   - Or does the flush interval timer only start when consumeBatchWhenAvailable is called
on the disruptor queue and a full batch is not available? In which case the queue will wait
for 1ms and return whatever is in the queue at the end of that interval or, if 100 tuples
turn up within that 1ms, return the full batch.   







>From the code in storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.javait seems
option 1 might be the case. However, the code in that class is quite complex and the interplay
with the underlying LMAX library makes it hard to reason about. 




Any help with the above would greatly appreciated, I am attempting to model the effect of
these queues on topology performance and hopefully investigate a way to optimise the choice
of batch size and flush interval.




Thanks,



Thomas Cooper
PhD Student
Newcastle University, School of Computing
W: http://www.tomcooper.org.uk | Twitter:@tomncooper

  
Mime
View raw message