Return-Path: X-Original-To: apmail-storm-user-archive@minotaur.apache.org Delivered-To: apmail-storm-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D15F811A86 for ; Thu, 17 Jul 2014 01:21:51 +0000 (UTC) Received: (qmail 1638 invoked by uid 500); 17 Jul 2014 01:21:51 -0000 Delivered-To: apmail-storm-user-archive@storm.apache.org Received: (qmail 1589 invoked by uid 500); 17 Jul 2014 01:21:51 -0000 Mailing-List: contact user-help@storm.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.incubator.apache.org Delivered-To: mailing list user@storm.incubator.apache.org Received: (qmail 1578 invoked by uid 99); 17 Jul 2014 01:21:51 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jul 2014 01:21:51 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of tomas.mazukna@gmail.com designates 209.85.223.170 as permitted sender) Received: from [209.85.223.170] (HELO mail-ie0-f170.google.com) (209.85.223.170) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jul 2014 01:21:48 +0000 Received: by mail-ie0-f170.google.com with SMTP id rl12so1860988iec.15 for ; Wed, 16 Jul 2014 18:21:23 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=Q4uwzgGGqaFun801/Lvjzqwd83ZEBCgSse2y/kRxYOY=; b=FdCAVXJfXQwYNYTDU0eOf2yEmu9v3FgWzWAhCKBFk4iTf3L4nWJLPNd6Cux5BzRTlK aav/+kT+sdrSYRK3RuG+kfIK6PI/qtFp9LwZqinS4ysElImbUEeGyUHxbBulI/FLdUxr Xtue/7E9EXvazIZPbRv9Gfs+YGxDaC0QkIyNkEngCbf2QXZqfPSfxq4GN+2U2ErKV4wJ SCG8msmcHQfWqUJ46Lr8atxRmr/Q+t/uIErQfl4WYTGnqG59wMYK9Q4tlBB56ZrkrTn3 iK3niE7ckyF+ZuniqZ3dm18neMqLf7CDSlWwwwEY7mnAVFViTMivnuX8RAoKO8QT9F8l UalA== MIME-Version: 1.0 X-Received: by 10.42.120.3 with SMTP id d3mr26104750icr.55.1405560083575; Wed, 16 Jul 2014 18:21:23 -0700 (PDT) Received: by 10.64.18.79 with HTTP; Wed, 16 Jul 2014 18:21:23 -0700 (PDT) In-Reply-To: References: Date: Wed, 16 Jul 2014 21:21:23 -0400 Message-ID: Subject: Re: Distribute Spout output among all bolts From: Tomas Mazukna To: user@storm.incubator.apache.org Content-Type: multipart/alternative; boundary=90e6ba613f647df90104fe59750e X-Virus-Checked: Checked by ClamAV on apache.org --90e6ba613f647df90104fe59750e Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Kafka client handles that, it is stored in zookeeper with the offset. I wrote a kafka spout based on kafka groups consumer api. Kafka allows only one consumer per partition per group. On Wed, Jul 16, 2014 at 8:41 PM, Andrew Xor wrote: > Ok, but upon runtime how to you set in the spout which kafka partition to > subscribe at? > > Kindly yours, > > Andrew Grammenos > > -- PGP PKey -- > =E2=80=8B > https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt > > > On Thu, Jul 17, 2014 at 3:30 AM, Tomas Mazukna > wrote: > >> So you want to define only one instance of the spout that reads the file= . >> Number of bolts will only depend on how fast you need to process the dat= a. >> I have a topology that has a spout with parallelism of 40 - connected to >> 40 partitions of a kafka topic. It send traffic to the first bolt which = has >> parallelism 320. The whole topology is split up into 4 workers. that mak= es >> 10 spout instances in each jvm, feeding 80 bolts. In my case I have >> grouping so tuples get routed to different physical machines. >> >> Tomas >> >> >> On Wed, Jul 16, 2014 at 8:10 PM, Andrew Xor >> wrote: >> >>> Michael, >>> >>> =E2=80=8B Thanks for the response but I think another problem arises; a= s =E2=80=8BI just >>> cooked up a small example the increased number of workers only spawns >>> mirrors of the topology. This poses a problem for me due to the fact th= at >>> my spout reads from a very big file and converts each line into a tuple= and >>> feeds that in the topology. What I wanted to do in the first place is t= o >>> actually send each tuple produced to a different subscribed bolt each t= ime >>> (using Round Robing or smth) so that each one of them got 1/n nth (wher= e n >>> the number of bolts) of the input stream. If I spawn 2 workers both wil= l >>> read the same file and emit the same tuples so both topology workers wi= ll >>> produce the same results. >>> >>> I wanted to avoid to create a spout that takes a file offset as an >>> input and wire a lot more stuff than I have to; so I was trying to find= a >>> way to perform what I told you in an elegant and scalable fashion...so = far >>> I have found nil. >>> >>> >>> On Thu, Jul 17, 2014 at 2:57 AM, Michael Rose >>> wrote: >>> >>>> It doesn't say so, but if you have 4 workers, the 4 executors will be >>>> shared evenly over the 4 workers. Likewise, 16 will partition 4 each. = The >>>> only case where a worker will not get a specific executor is when ther= e are >>>> less executors than workers (e.g. 8 workers, 4 executors), 4 of the wo= rkers >>>> will receive an executor but the others will not. >>>> >>>> It sounds like for your case, shuffle+parallelism is more than >>>> sufficient. >>>> >>>> Michael Rose (@Xorlev ) >>>> Senior Platform Engineer, FullContact >>>> michael@fullcontact.com >>>> >>>> >>>> On Wed, Jul 16, 2014 at 5:53 PM, Andrew Xor < >>>> andreas.grammenos@gmail.com> wrote: >>>> >>>>> Hey Stephen, Michael, >>>>> >>>>> Yea I feared as much... as searching the docs and API did not surfac= e >>>>> any reliable and elegant way of doing that unless you had a "RouterBo= lt". >>>>> If setting the parallelism of a component is enough for load balancin= g the >>>>> processes across different machines that are part of the Storm cluste= r then >>>>> this would suffice in my use case. Although here >>>>> >>>>> the documentation says executors are threads and it does not explicit= ly say >>>>> anywhere that threads are spawned across different nodes of the clust= er... >>>>> I want to avoid the possibility of these threads only spawning locall= y and >>>>> not in a distributed fashion among the cluster nodes.. >>>>> >>>>> Andrew. >>>>> >>>>> >>>>> On Thu, Jul 17, 2014 at 2:46 AM, Michael Rose >>>> > wrote: >>>>> >>>>>> Maybe we can help with your topology design if you let us know what >>>>>> you're doing that requires you to shuffle half of the whole stream o= utput >>>>>> to each of the two different types of bolts. >>>>>> >>>>>> If bolt b1 and bolt b2 are both instances of ExampleBolt (and not tw= o >>>>>> different types) as above, there's no point to doing this. Setting t= he >>>>>> parallelism will make sure that data is partitioned across machines = (by >>>>>> default, setting parallelism sets tasks =3D executors =3D parallelis= m). >>>>>> >>>>>> Unfortunately, I don't know of any way to do this other than >>>>>> shuffling the output to a new bolt, e.g. bolt "b0" a 'RouterBolt', t= hen >>>>>> having bolt b0 round-robin the received tuples between two streams, = then >>>>>> have b1 and b2 shuffle over those streams instead. >>>>>> >>>>>> >>>>>> >>>>>> Michael Rose (@Xorlev ) >>>>>> Senior Platform Engineer, FullContact >>>>>> michael@fullcontact.com >>>>>> >>>>>> >>>>>> On Wed, Jul 16, 2014 at 5:40 PM, Andrew Xor < >>>>>> andreas.grammenos@gmail.com> wrote: >>>>>> >>>>>>> =E2=80=8B >>>>>>> Hi Tomas, >>>>>>> >>>>>>> As I said in my previous mail the grouping is for a bolt *task* no= t >>>>>>> for the actual number of spawned bolts; for example let's say you h= ave two >>>>>>> bolts that have a parallelism hint of 3 and these two bolts are wir= ed to >>>>>>> the same spout. If you set the bolts as such: >>>>>>> >>>>>>> tb.setBolt("b1", new ExampleBolt(), 2 /* p-hint >>>>>>> */).shuffleGrouping("spout1"); >>>>>>> tb.setBolt("b2", new ExampleBolt(), 2 /* p-hint >>>>>>> */).shuffleGrouping("spout1"); >>>>>>> >>>>>>> Then each of the tasks will receive half of the spout tuples but >>>>>>> each actual spawned bolt will receive all of the tuples emitted fro= m the >>>>>>> spout. This is more evident if you set up a counter in the bolt cou= nting >>>>>>> how many tuples if has received and testing this with no parallelis= m hint >>>>>>> as such: >>>>>>> >>>>>>> tb.setBolt("b1", new ExampleBolt(),).shuffleGrouping("spout1"); >>>>>>> tb.setBolt("b2", new ExampleBolt()).shuffleGrouping("spout1"); >>>>>>> >>>>>>> Now you will see that both bolts will receive all tuples emitted by >>>>>>> spout1. >>>>>>> >>>>>>> Hope this helps. >>>>>>> >>>>>>> =E2=80=8B >>>>>>> =E2=80=8BAndrew.=E2=80=8B >>>>>>> >>>>>>> >>>>>>> On Thu, Jul 17, 2014 at 2:33 AM, Tomas Mazukna < >>>>>>> tomas.mazukna@gmail.com> wrote: >>>>>>> >>>>>>>> Andrew, >>>>>>>> >>>>>>>> when you connect your bolt to your spout you specify the grouping. >>>>>>>> If you use shuffle grouping then any free bolt gets the tuple - in= my >>>>>>>> experience even in lightly loaded topologies the distribution amon= gst bolts >>>>>>>> is pretty even. If you use all grouping then all bolts receive a c= opy of >>>>>>>> the tuple. >>>>>>>> Use shuffle grouping and each of your bolts will get about 1/3 of >>>>>>>> the workload. >>>>>>>> >>>>>>>> Tomas >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jul 16, 2014 at 7:05 PM, Andrew Xor < >>>>>>>> andreas.grammenos@gmail.com> wrote: >>>>>>>> >>>>>>>>> H >>>>>>>>> =E2=80=8Bi, >>>>>>>>> >>>>>>>>> I am trying to distribute the spout output to it's subscribed >>>>>>>>> bolts evenly; let's say that I have a spout that emits tuples and= three >>>>>>>>> bolts that are subscribed to it. I want each of the three bolts t= o receive >>>>>>>>> 1/3 rth of the output (or emit a tuple to each one of these bolts= in >>>>>>>>> turns). Unfortunately as far as I understand all bolts will recei= ve all of >>>>>>>>> the emitted tuples of that particular spout regardless of the gro= uping >>>>>>>>> defined (as grouping from my understanding is for bolt *tasks* no= t actual >>>>>>>>> bolts). >>>>>>>>> >>>>>>>>> I've searched a bit and I can't seem to find a way to accomplish >>>>>>>>> that...=E2=80=8B is there a way to do that or I am searching in v= ain? >>>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Tomas Mazukna >>>>>>>> 678-557-3834 >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >> >> -- >> Tomas Mazukna >> 678-557-3834 >> > > --=20 Tomas Mazukna 678-557-3834 --90e6ba613f647df90104fe59750e Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Kafka client handles that, it is stored in zookeeper with = the offset.=C2=A0
I wrote a kafka spout based on kafka groups consumer = api. Kafka allows only one consumer per partition per group.


On Wed, Jul 16, 2014 at 8:41 PM, Andrew = Xor <andreas.grammenos@gmail.com> wrote:
Ok, but upon runtime h= ow to you set in the spout which kafka partition to subscribe at?
=

Kindly yo= urs,

Andrew Grammenos

-- PGP PKey --
=E2=80=8B<= /div>


On Thu, Jul 17, 2014 at 3:30 AM, Tomas M= azukna <tomas.mazukna@gmail.com> wrote:
So you want to define only one instance of the spout that = reads the file. Number of bolts will only depend on how fast you need to pr= ocess the data.
I have a topology that has a spout with parallelism of = 40 - connected to 40 partitions of a kafka topic. It send traffic to the fi= rst bolt which has parallelism 320. The whole topology is split up into 4 w= orkers. that makes 10 spout instances in each jvm, feeding 80 bolts. In my = case I have grouping so tuples get routed to different physical machines.

Tomas
<= br>
On Wed, Jul 16, 2014 at 8:10 PM, Andrew X= or <andreas.grammenos@gmail.com> wrote:
=C2=A0Michael,

=E2=80=8B Thanks for the response but I= think another problem arises; as =E2=80=8BI just cooked up a small example= the increased number of workers only spawns mirrors of the topology. This = poses a problem for me due to the fact that my spout reads from a very big = file and converts each line into a tuple and feeds that in the topology. Wh= at I wanted to do in the first place is to actually send each tuple produce= d to a different subscribed bolt each time (using Round Robing or smth) so = that each one of them got 1/n nth (where n the number of bolts) of the inpu= t stream. If I spawn 2 workers both will read the same file and emit the sa= me tuples so both topology workers will produce the same results.

=C2=A0I wanted to avoid to cr= eate a spout that takes a file offset as an input and wire a lot more stuff= than I have to; so I was trying to find a way to perform what I told you i= n an elegant and scalable fashion...so far I have found nil.


On Thu, Jul 17, 2014 at = 2:57 AM, Michael Rose <michael@fullcontact.com> wrote:=
It doesn't say so, but if you have 4 workers, the 4 ex= ecutors will be shared evenly over the 4 workers. Likewise, 16 will partiti= on 4 each. The only case where a worker will not get a specific executor is= when there are less executors than workers (e.g. 8 workers, 4 executors), = 4 of the workers will receive an executor but the others will not.

It sounds like for your case, shuffle+parallelism is more th= an sufficient.

Michael Rose (@= Xorlev)
Senior Platform Engineer,=C2=A0FullContact
michael@fullcontact.com<= /p>



On Wed, Jul 16, 2014 at = 5:53 PM, Andrew Xor <andreas.grammenos@gmail.com> = wrote:
Hey Stephen, Michael,

here the documentation says exec= utors are threads and it does not explicitly say anywhere that threads are = spawned across different nodes of the cluster... I want to avoid the possib= ility of these threads only spawning locally and not in a distributed fashi= on among the cluster nodes..

Andrew.


On Thu, Jul 1= 7, 2014 at 2:46 AM, Michael Rose <michael@fullcontact.com> wrote:
Maybe we can help= with your topology design if you let us know what you're doing that re= quires you to shuffle half of the whole stream output to each of the two di= fferent types of bolts.

If bolt b1 and bolt b2 are both instances of ExampleBolt (an= d not two different types) as above, there's no point to doing this. Se= tting the parallelism will make sure that data is partitioned across machin= es (by default, setting parallelism sets tasks =3D executors =3D parallelis= m).

Unfortunately, I don't know of any way to do this = other than shuffling the output to a new bolt, e.g. bolt "b0" a &= #39;RouterBolt', then having bolt b0 round-robin the received tuples be= tween two streams, then have b1 and b2 shuffle over those streams instead.<= div>


Michael Rose (@= Xorlev)
Senior Platform Engineer,=C2=A0FullContact
michael@fullcontact.com<= /p>



On Wed, Jul 16, 2014 at 5:40 PM, Andrew = Xor <andreas.grammenos@gmail.com> wrote:
=E2=80=8B
Hi Tomas,

=C2=A0As I said in my previous mail the grouping is for a bolt *task* not for=20 the actual number of spawned bolts; for example let's say you have two = bolts that=20 have a parallelism hint of 3 and these two bolts are wired to the same=20 spout. If you set the bolts as such:

tb.setBolt("b1", new = ExampleBolt(), 2 /* p-hint */).shuffleGrouping("spout1");
tb.s= etBolt("b2", new ExampleBolt(), 2 /* p-hint */).shuffleGrouping(&= quot;spout1");

Then each of the tasks will receive half of the spout tuples but each actual spawned bolt will receive all of the tuples emitted from the spout.=20 This is more evident if you set up a counter in the bolt counting how=20 many tuples if has received and testing this with no parallelism hint as such:

tb.setBolt("b1", new ExampleBolt(),).shuffleGroupin= g("spout1");
tb.setBolt("b2", new ExampleBolt()).shu= ffleGrouping("spout1");

Now you will see that both bolts will receive all tuples emitted by spout1.=

Hope this helps.

=E2=80=8B
=E2=80=8BAndrew.=E2=80=8B


On Thu, Jul 17, 2014 at 2:33 AM, Tomas Mazukna <t= omas.mazukna@gmail.com> wrote:
Andrew,<= div>
when you connect your bolt to your spout you specify the= grouping. If you use shuffle grouping then any free bolt gets the tuple - = in my experience even in lightly loaded topologies the distribution amongst= bolts is pretty even. If you use all grouping then all bolts receive a cop= y of the tuple.=C2=A0
Use shuffle grouping and each of your bolts will get about 1/3 of the = workload.

Tomas


On Wed, Jul 16, 2014 at 7:= 05 PM, Andrew Xor <andreas.grammenos@gmail.com> wr= ote:
H
=E2=80=8Bi,

=C2=A0I am trying to distribute the spout output to it's subscribed bol= ts evenly; let's say that I have a spout that emits tuples and three bo= lts that are subscribed to it. I want each of the three bolts to receive 1/= 3 rth of the output (or emit a tuple to each one of these bolts in turns). = Unfortunately as far as I understand all bolts will receive all of the emit= ted tuples of that particular spout regardless of the grouping defined (as = grouping from my understanding is for bolt *tasks* not actual bolts).

=C2=A0I've searched a bit and I ca= n't seem to find a way to accomplish that...=E2=80=8B is there a way to= do that or I am searching in vain?

Thanks.



--
Tomas Mazukna
678-557-3834








<= font color=3D"#888888">--
Tomas Mazukna
678-557-3834




--
Tomas Mazukn= a
678-557-3834 --90e6ba613f647df90104fe59750e--