From user-return-14272-archive-asf-public=cust-asf.ponee.io@storm.apache.org Wed Dec 12 19:39:09 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id DF84018064E for ; Wed, 12 Dec 2018 19:39:07 +0100 (CET) Received: (qmail 51807 invoked by uid 500); 12 Dec 2018 18:39:06 -0000 Mailing-List: contact user-help@storm.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.apache.org Delivered-To: mailing list user@storm.apache.org Received: (qmail 51797 invoked by uid 99); 12 Dec 2018 18:39:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Dec 2018 18:39:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 0583A180949 for ; Wed, 12 Dec 2018 18:39:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.059 X-Spam-Level: ** X-Spam-Status: No, score=2.059 tagged_above=-999 required=6.31 tests=[DKIMWL_WL_MED=-0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, KAM_LOTSOFHASH=0.25, RCVD_IN_DNSWL_NONE=-0.0001, SPF_PASS=-0.001, T_MIXED_ES=0.01, WEIRD_PORT=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=yahoo.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 5hEcUdfQu5Qh for ; Wed, 12 Dec 2018 18:39:03 +0000 (UTC) Received: from sonic308-11.consmr.mail.ne1.yahoo.com (sonic308-11.consmr.mail.ne1.yahoo.com [66.163.187.34]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 1B8936110C for ; Wed, 12 Dec 2018 18:39:01 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=yahoo.com; s=s2048; t=1544639935; bh=1jJEYbsvP7mZa52bIlnhC8kz6SK0FkXVgDrmeCfbw0s=; h=Date:From:To:In-Reply-To:References:Subject:From:Subject; b=ZZZSMF8E75GFF2+Mxf5D0ivcwec6ekXiJsaVxuxIsQaI4rSsByrrHNDtiBIqvs2qElanTbda/qhZFvwKEtaN9gr0ZL0Y/gIkZsAP84TWYmfGB3N/GsKkiHMxCPJi3NNO2mgZOpM90FwZwGmx70esHzAlizKx/skt+oxDBHGJJ02DIVeaNsGh/v+hnABwBXMLTl1BeeeSBEKNnkTosVaR5Duqwy21//Mgzh3H+xPmo1uIUidPgva6UkI/fAitfvzHzkLXimNFr/OAoxeuvuEt67hvMp6oGabbYzioVFqSQDEDYPJu8CKFMsGYSArFJI5YA8T7lh2bxWP04yRhhhEovg== X-YMail-OSG: nYhr5eEVM1nss_RuRMnnA2XX4164VTfu5BfcFY4nOudUEhsNynYvo9uoe1W2VgY mc_c7HWkCrbTKfH2bbOTQn1qj6m25rSW3zPilRCWD6Eg2nL0DP1B4v13pK1jPAJwk2aIErN9rQNM 3Wke1grm8t2YEIBDDOWKHWgJF5KfZdgD2kE1GtGmb8skSIS.LQGuGL11uMpdpxVRrxNgYOI30ysL PPRmTWu6wg4cpwRpZCRiHBepRT3lmRitIStGaiyyTzL9BQSjOqT7lvZITS16fgzi230NGuUwN5T. DMvWjXo5VppGWWpdXwc_kLe67.ceDrzZ8y21dt6dFUnVspoWYQnZou1B_BdLudjehJokV6UGamHk JIJIVjbPhZPfqBa_JwGCzjAcq2CQyucJ2YIjW482AeeZVlgG_76CH6qy8f6FB7EOLrbQgkXJhyUx W6QF9rvhM2mkJ0kHS_DS4RqBo1pFpLokE3aPWl_H7IkC3YGpx03cN36M087Q_zlTjfm99MAP76wg rwKIxWfbpHIc3MlIyn9EFMt4V8pEX02JyE8bnNBQKAaBixg7fJX42ZTy2CewnVvQD6A_kVhhmZ0F HaZLbiqZvClaXKf_1cpWLe6l6_Oj_zQT020z1fqhQxZ_oBQpV1fxVC_d94yrgNuotb9AY5IoWHYt SslMbTcibyGmioLTNju3GyumFZhoPNkXaGQMc3zb5DmJLkV_2JIvqk4Vo0Ie709ztZwbkXw5dnGM uCWZM9AnJwPPNyYLWOXNrDet.RseWf4VQirYGahXSZmRl8ArvvKcXlycyBHQGPMO3ZXGu._CRojz ulohm7Y3YKcZvJpirjED8g4c59XYcmc57W2dQsOdCkkOEkGbSvg4yr2VQl_D09wJFDn5lvy4p7.7 0vKe5Fs1vY4_unmz7yIubUGvHhHL5n9OCT3xO0Ye5Ki1PvQ6qQYUKwbWJ5rbJNv5VipSMiCru4fs vGhQMdlChuOGS6zx9.R60m6Ffv2aeP48IZSJgqVS3M8BsWJstURMtbxT73YvXWcHm7g-- Received: from sonic.gate.mail.ne1.yahoo.com by sonic308.consmr.mail.ne1.yahoo.com with HTTP; Wed, 12 Dec 2018 18:38:55 +0000 Date: Wed, 12 Dec 2018 18:38:51 +0000 (UTC) From: Roshan Naik To: user@storm.apache.org Message-ID: <299763710.2955247.1544639931381@mail.yahoo.com> In-Reply-To: References: Subject: Re: Disruptor Queue flush behaviour MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="----=_Part_2955246_989048814.1544639931377" X-Mailer: WebService/1.1.12857 YMailNorrin Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36 ------=_Part_2955246_989048814.1544639931377 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable =C2=A0There is a detailed diagram of both the 1.x and 2.x messaging intern= als ...=C2=A0 starting at slide=C2=A0 11 over here:=C2=A0https://www.slides= hare.net/Hadoop_Summit/next-generation-execution-for-apache-storm The flusher threads shown in 1.x there should be in a thread pool .. as Bob= by described earlier. Recording of a bit older version of the talk associated with those slides i= s here.=C2=A0=C2=A0https://www.youtube.com/watch?v=3DbZPpt4NnvsA&t=3D2s May answer some of your questions.-roshan On Wednesday, December 12, 2018, 4:18:08 AM PST, Thomas Cooper (PGR) wrote: =20 =20 Bobby, Thank you=C2=A0for such a detailed reply. It was really useful. I was wonde= ring if you could confirm a few final details for me? (This is for Storm ve= rsion 1.2.2) 1)=C2=A0My=C2=A0first=C2=A0question=C2=A0is=C2=A0about=C2=A0the=C2=A0worker= =C2=A0process=C2=A0local=C2=A0transfer=C2=A0function=C2=A0(defined=C2=A0in= =C2=A0https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef= 6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/worker.clj#L116), that = is used for both moving tuples off the executor send thread to internal exe= cutor receive queues and also for moving=C2=A0batches of tuples arriving fr= om other workers onto the relevant receive queues. The local transfer funct= ion groups all AddressedTuples for a given executor into a list (called pai= rs in the code) that is then given to the DisruptoQueue's=C2=A0publish func= tion (via a Clojure wrapper).=C2=A0 My question is, because my knowledge of Clojure is limited, does the local = transfer function (at=C2=A0https://github.com/apache/storm/blob/d2d6f40344e= 6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/wo= rker.clj#L125) pass individual tuples from the pairs list to the publish fu= nction (via some kind of map operation)=C2=A0or does it pass the list of tu= ples in one go? I ask because I am trying to model the tuple flow and so I need to work out= if batches in the DisruptorQueue's overflow queue are batches of individua= l=C2=A0tuples or batches of batches (lists) of tuples (which complicates th= ings). The executor's event handler (https://github.com/apache/storm/blob/d2d6f403= 44e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon= /executor.clj#L457),=C2=A0which is called on a batch of objects returned fr= om the ring buffer, seems to be expecting a list of individual=C2=A0tuples,= not a list of lists of tuples. Therefore it seems like the former situatio= n described=C2=A0above (where the local transfer function gives individual= =C2=A0tuples to the=C2=A0publish function) is likely, but I just wanted to = be sure. 2) My second question is about the DistruptorQueue=C2=A0consumeBatchWhenAva= ilable method (https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a= 462d577ef6b61f8b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.= java#L481). I just wanted to confirm that the "batch" which this method ref= ers to is actually "every available object in the ring buffer at that time"= . Therefore the batched consumed via this method could be of any size from = 1 up to the ring buffer's limit (1024 by default)? Thanks again for helping with my questions. Regards, Thomas Cooper PhD Student Newcastle University, School of Computer Science W: http://www.tomcooper.org.uk | Twitter:@tomncooper From: Bobby Evans Sent: 27 November 2018 15:56 To: user Subject: Re: Disruptor Queue flush behaviour=C2=A0FYI in 2.x all of this is= different, but to answer your questions for 1.x. It is a little complicated to try and keep the memory and CPU overhead low,= especially when few tuples are flowing.=C2=A0 Conceptually what happens is= that tuples are placed into a separate data structure when they are insert= ed. https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8= b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L225 If that batch fills up it will attempt to insert them into the disruptor qu= eue.=C2=A0 Inserting multiple messages into the queue is more efficient tha= n inserting in a single message at a time.=C2=A0 If there is not enough cap= acity to insert the message it goes into an overflow queue.=C2=A0=20 Every millisecond there is a thread pool that will then work at flushing al= l of the tuples buffered in the entire JVM.=C2=A0 First it will force any o= utstanding tuples to be placed into the overflow queue. https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8= b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L268-L273 After that it goes through that overflow queue and makes sure all of the tu= ples from overflow are flushed into the disruptor queue.=C2=A0 So if no tup= les are flowing a single thread in the thread pool will wake up once a ms t= o do a few checks per queue and end up doing nothing.=C2=A0 If messages are= flowing once a ms a partial batch is inserted into the disruptor queue, an= d depending on how long it takes to insert those messages into the queue th= ere may be a few threads doing this. I hope this helps, Bobby On Mon, Nov 26, 2018 at 7:03 AM Thomas Cooper (PGR) wrote: Hi, I have a question about the behaviour of the LMAX disruptor queues that the= executor send/receive and the worker transfer queues use. These queues batch tuples for processing (100 by default) and will wait unt= il a full batch has arrived before passing them to the executor. However, t= hey will also flush any tuples in the queue periodically (1=C2=A0ms by defa= ult)=C2=A0to prevent the queue blocking for a long time while it waits for = 100 tuples to turn up. My question is about the implementation of the flush interval behaviour: =20 - Does the flush interval thread=C2=A0run continuously, issuing a flush = command every 1=C2=A0ms and the queue just ignores it if it is already flus= hing. If 100 tuples turn up between the constant flush commands the queue i= ssues them straight away.=C2=A0 - Or does the flush interval timer only start when=C2=A0consumeBatchWhen= Available is called on the disruptor queue and a full batch is not availabl= e? In which case the queue will wait for 1ms and return whatever is in the = queue at the end of that interval or, if 100 tuples turn up within that 1ms= , return the full batch. =20 From the code in=C2=A0storm-core/src/jvm/org/apache/storm/utils/DisruptorQu= eue.javait seems option 1 might be the case. However, the code in that clas= s is quite complex and the interplay with the underlying LMAX library makes= it hard to reason about.=C2=A0 Any help with the above would greatly appreciated, I am attempting to model= the effect of these queues on topology performance and hopefully investiga= te a way to optimise the choice of batch size and flush interval. Thanks, Thomas Cooper PhD Student Newcastle University, School of Computing W: http://www.tomcooper.org.uk | Twitter:@tomncooper =20 ------=_Part_2955246_989048814.1544639931377 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: 7bit
 There is a detailed diagram of both the 1.x and 2.x messaging internals ...  starting at slide  11 over here: 

The flusher threads shown in 1.x there should be in a thread pool .. as Bobby described earlier.

Recording of a bit older version of the talk associated with those slides is here.  

May answer some of your questions.
-roshan








On Wednesday, December 12, 2018, 4:18:08 AM PST, Thomas Cooper (PGR) <t.cooper@newcastle.ac.uk> wrote:


Bobby,


Thank you for such a detailed reply. It was really useful. I was wondering if you could confirm a few final details for me? (This is for Storm version 1.2.2)


1) My first question is about the worker process local transfer function (defined in https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/worker.clj#L116), that is used for both moving tuples off the executor send thread to internal executor receive queues and also for moving batches of tuples arriving from other workers onto the relevant receive queues. The local transfer function groups all AddressedTuples for a given executor into a list (called pairs in the code) that is then given to the DisruptoQueue's publish function (via a Clojure wrapper). 


My question is, because my knowledge of Clojure is limited, does the local transfer function (at https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/worker.clj#L125) pass individual tuples from the pairs list to the publish function (via some kind of map operation) or does it pass the list of tuples in one go?


I ask because I am trying to model the tuple flow and so I need to work out if batches in the DisruptorQueue's overflow queue are batches of individual tuples or batches of batches (lists) of tuples (which complicates things).


The executor's event handler (https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L457), which is called on a batch of objects returned from the ring buffer, seems to be expecting a list of individual tuples, not a list of lists of tuples. Therefore it seems like the former situation described above (where the local transfer function gives individual tuples to the publish function) is likely, but I just wanted to be sure.


2) My second question is about the DistruptorQueue consumeBatchWhenAvailable method (https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L481). I just wanted to confirm that the "batch" which this method refers to is actually "every available object in the ring buffer at that time". Therefore the batched consumed via this method could be of any size from 1 up to the ring buffer's limit (1024 by default)?


Thanks again for helping with my questions.


Regards,


Thomas Cooper
PhD Student
Newcastle University, School of Computer Science



From: Bobby Evans <bobby@apache.org>
Sent: 27 November 2018 15:56
To: user
Subject: Re: Disruptor Queue flush behaviour
 
FYI in 2.x all of this is different, but to answer your questions for 1.x.

It is a little complicated to try and keep the memory and CPU overhead low, especially when few tuples are flowing.  Conceptually what happens is that tuples are placed into a separate data structure when they are inserted.


If that batch fills up it will attempt to insert them into the disruptor queue.  Inserting multiple messages into the queue is more efficient than inserting in a single message at a time.  If there is not enough capacity to insert the message it goes into an overflow queue. 

Every millisecond there is a thread pool that will then work at flushing all of the tuples buffered in the entire JVM.  First it will force any outstanding tuples to be placed into the overflow queue.


After that it goes through that overflow queue and makes sure all of the tuples from overflow are flushed into the disruptor queue.  So if no tuples are flowing a single thread in the thread pool will wake up once a ms to do a few checks per queue and end up doing nothing.  If messages are flowing once a ms a partial batch is inserted into the disruptor queue, and depending on how long it takes to insert those messages into the queue there may be a few threads doing this.

I hope this helps,

Bobby


On Mon, Nov 26, 2018 at 7:03 AM Thomas Cooper (PGR) <t.cooper@newcastle.ac.uk> wrote:

Hi,


I have a question about the behaviour of the LMAX disruptor queues that the executor send/receive and the worker transfer queues use.


These queues batch tuples for processing (100 by default) and will wait until a full batch has arrived before passing them to the executor. However, they will also flush any tuples in the queue periodically (1 ms by default) to prevent the queue blocking for a long time while it waits for 100 tuples to turn up.


My question is about the implementation of the flush interval behaviour:


  1. Does the flush interval thread run continuously, issuing a flush command every 1 ms and the queue just ignores it if it is already flushing. If 100 tuples turn up between the constant flush commands the queue issues them straight away. 
  2. Or does the flush interval timer only start when consumeBatchWhenAvailable is called on the disruptor queue and a full batch is not available? In which case the queue will wait for 1ms and return whatever is in the queue at the end of that interval or, if 100 tuples turn up within that 1ms, return the full batch.


From the code in storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java it seems option 1 might be the case. However, the code in that class is quite complex and the interplay with the underlying LMAX library makes it hard to reason about. 


Any help with the above would greatly appreciated, I am attempting to model the effect of these queues on topology performance and hopefully investigate a way to optimise the choice of batch size and flush interval.


Thanks,


Thomas Cooper
PhD Student
Newcastle University, School of Computing
------=_Part_2955246_989048814.1544639931377--