Return-Path: X-Original-To: apmail-storm-user-archive@minotaur.apache.org Delivered-To: apmail-storm-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 61F1611848 for ; Thu, 17 Jul 2014 00:11:10 +0000 (UTC) Received: (qmail 49965 invoked by uid 500); 17 Jul 2014 00:11:10 -0000 Delivered-To: apmail-storm-user-archive@storm.apache.org Received: (qmail 49925 invoked by uid 500); 17 Jul 2014 00:11:09 -0000 Mailing-List: contact user-help@storm.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.incubator.apache.org Delivered-To: mailing list user@storm.incubator.apache.org Received: (qmail 49915 invoked by uid 99); 17 Jul 2014 00:11:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jul 2014 00:11:09 +0000 X-ASF-Spam-Status: No, hits=2.5 required=5.0 tests=FREEMAIL_REPLY,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of andreas.grammenos@gmail.com designates 209.85.216.173 as permitted sender) Received: from [209.85.216.173] (HELO mail-qc0-f173.google.com) (209.85.216.173) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jul 2014 00:11:05 +0000 Received: by mail-qc0-f173.google.com with SMTP id c9so1469123qcz.18 for ; Wed, 16 Jul 2014 17:10:44 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=6AihyE21z00qzR8za8rBF3Y4GgywSCv1SPZhOp9Y3oE=; b=BLAMxiF0KEFYzNP1T5CS+gda67OJxpc6eenijSstesWRAsqtpnUDdyeZohg96ti05p 8drj4buS1JIANW4SDtVuRYsUUKnjmUbEaiaEEwRTooFahHJvffLpFjbyjWgRy+Za3tfM 3iuKgcagi9XahL7ihkivA+4VgPEiwX/KzvNaTLzeFRZyQWzQ/g3PVFRjMDw2Z3yCgCPU lM30Ab3ehUWsHr8NBaBH/RlXpsmcyUWanwyqBgJ3GiuyM2+niaSeB1B6qHd1CDcC3ik1 utQPvc4YUu5P5tbrGzct/Z/8mk+sShCOnUpcvqQUV+L5E4G9lA/403zxOjqZSwyBdXPg +06A== X-Received: by 10.140.50.143 with SMTP id s15mr49987812qga.36.1405555844745; Wed, 16 Jul 2014 17:10:44 -0700 (PDT) MIME-Version: 1.0 Received: by 10.96.75.69 with HTTP; Wed, 16 Jul 2014 17:10:24 -0700 (PDT) In-Reply-To: References: From: Andrew Xor Date: Thu, 17 Jul 2014 03:10:24 +0300 Message-ID: Subject: Re: Distribute Spout output among all bolts To: user@storm.incubator.apache.org Content-Type: multipart/alternative; boundary=001a1135206ed6928504fe58788a X-Virus-Checked: Checked by ClamAV on apache.org --001a1135206ed6928504fe58788a Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Michael, =E2=80=8B Thanks for the response but I think another problem arises; as = =E2=80=8BI just cooked up a small example the increased number of workers only spawns mirrors of the topology. This poses a problem for me due to the fact that my spout reads from a very big file and converts each line into a tuple and feeds that in the topology. What I wanted to do in the first place is to actually send each tuple produced to a different subscribed bolt each time (using Round Robing or smth) so that each one of them got 1/n nth (where n the number of bolts) of the input stream. If I spawn 2 workers both will read the same file and emit the same tuples so both topology workers will produce the same results. I wanted to avoid to create a spout that takes a file offset as an input and wire a lot more stuff than I have to; so I was trying to find a way to perform what I told you in an elegant and scalable fashion...so far I have found nil. On Thu, Jul 17, 2014 at 2:57 AM, Michael Rose wrote: > It doesn't say so, but if you have 4 workers, the 4 executors will be > shared evenly over the 4 workers. Likewise, 16 will partition 4 each. The > only case where a worker will not get a specific executor is when there a= re > less executors than workers (e.g. 8 workers, 4 executors), 4 of the worke= rs > will receive an executor but the others will not. > > It sounds like for your case, shuffle+parallelism is more than sufficient= . > > Michael Rose (@Xorlev ) > Senior Platform Engineer, FullContact > michael@fullcontact.com > > > On Wed, Jul 16, 2014 at 5:53 PM, Andrew Xor > wrote: > >> Hey Stephen, Michael, >> >> Yea I feared as much... as searching the docs and API did not surface >> any reliable and elegant way of doing that unless you had a "RouterBolt"= . >> If setting the parallelism of a component is enough for load balancing t= he >> processes across different machines that are part of the Storm cluster t= hen >> this would suffice in my use case. Although here >> >> the documentation says executors are threads and it does not explicitly = say >> anywhere that threads are spawned across different nodes of the cluster.= .. >> I want to avoid the possibility of these threads only spawning locally a= nd >> not in a distributed fashion among the cluster nodes.. >> >> Andrew. >> >> >> On Thu, Jul 17, 2014 at 2:46 AM, Michael Rose >> wrote: >> >>> Maybe we can help with your topology design if you let us know what >>> you're doing that requires you to shuffle half of the whole stream outp= ut >>> to each of the two different types of bolts. >>> >>> If bolt b1 and bolt b2 are both instances of ExampleBolt (and not two >>> different types) as above, there's no point to doing this. Setting the >>> parallelism will make sure that data is partitioned across machines (by >>> default, setting parallelism sets tasks =3D executors =3D parallelism). >>> >>> Unfortunately, I don't know of any way to do this other than shuffling >>> the output to a new bolt, e.g. bolt "b0" a 'RouterBolt', then having bo= lt >>> b0 round-robin the received tuples between two streams, then have b1 an= d b2 >>> shuffle over those streams instead. >>> >>> >>> >>> Michael Rose (@Xorlev ) >>> Senior Platform Engineer, FullContact >>> michael@fullcontact.com >>> >>> >>> On Wed, Jul 16, 2014 at 5:40 PM, Andrew Xor >> > wrote: >>> >>>> =E2=80=8B >>>> Hi Tomas, >>>> >>>> As I said in my previous mail the grouping is for a bolt *task* not >>>> for the actual number of spawned bolts; for example let's say you have= two >>>> bolts that have a parallelism hint of 3 and these two bolts are wired = to >>>> the same spout. If you set the bolts as such: >>>> >>>> tb.setBolt("b1", new ExampleBolt(), 2 /* p-hint >>>> */).shuffleGrouping("spout1"); >>>> tb.setBolt("b2", new ExampleBolt(), 2 /* p-hint >>>> */).shuffleGrouping("spout1"); >>>> >>>> Then each of the tasks will receive half of the spout tuples but each >>>> actual spawned bolt will receive all of the tuples emitted from the sp= out. >>>> This is more evident if you set up a counter in the bolt counting how = many >>>> tuples if has received and testing this with no parallelism hint as su= ch: >>>> >>>> tb.setBolt("b1", new ExampleBolt(),).shuffleGrouping("spout1"); >>>> tb.setBolt("b2", new ExampleBolt()).shuffleGrouping("spout1"); >>>> >>>> Now you will see that both bolts will receive all tuples emitted by >>>> spout1. >>>> >>>> Hope this helps. >>>> >>>> =E2=80=8B >>>> =E2=80=8BAndrew.=E2=80=8B >>>> >>>> >>>> On Thu, Jul 17, 2014 at 2:33 AM, Tomas Mazukna >>> > wrote: >>>> >>>>> Andrew, >>>>> >>>>> when you connect your bolt to your spout you specify the grouping. If >>>>> you use shuffle grouping then any free bolt gets the tuple - in my >>>>> experience even in lightly loaded topologies the distribution amongst= bolts >>>>> is pretty even. If you use all grouping then all bolts receive a copy= of >>>>> the tuple. >>>>> Use shuffle grouping and each of your bolts will get about 1/3 of the >>>>> workload. >>>>> >>>>> Tomas >>>>> >>>>> >>>>> On Wed, Jul 16, 2014 at 7:05 PM, Andrew Xor < >>>>> andreas.grammenos@gmail.com> wrote: >>>>> >>>>>> H >>>>>> =E2=80=8Bi, >>>>>> >>>>>> I am trying to distribute the spout output to it's subscribed bolts >>>>>> evenly; let's say that I have a spout that emits tuples and three bo= lts >>>>>> that are subscribed to it. I want each of the three bolts to receive= 1/3 >>>>>> rth of the output (or emit a tuple to each one of these bolts in tur= ns). >>>>>> Unfortunately as far as I understand all bolts will receive all of t= he >>>>>> emitted tuples of that particular spout regardless of the grouping d= efined >>>>>> (as grouping from my understanding is for bolt *tasks* not actual bo= lts). >>>>>> >>>>>> I've searched a bit and I can't seem to find a way to accomplish >>>>>> that...=E2=80=8B is there a way to do that or I am searching in vain= ? >>>>>> >>>>>> Thanks. >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Tomas Mazukna >>>>> 678-557-3834 >>>>> >>>> >>>> >>> >> > --001a1135206ed6928504fe58788a Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
=C2=A0Michael,

=E2=80=8B Thanks for the response but I= think another problem arises; as =E2=80=8BI just cooked up a small example= the increased number of workers only spawns mirrors of the topology. This = poses a problem for me due to the fact that my spout reads from a very big = file and converts each line into a tuple and feeds that in the topology. Wh= at I wanted to do in the first place is to actually send each tuple produce= d to a different subscribed bolt each time (using Round Robing or smth) so = that each one of them got 1/n nth (where n the number of bolts) of the inpu= t stream. If I spawn 2 workers both will read the same file and emit the sa= me tuples so both topology workers will produce the same results.

=C2=A0I wanted to avoid to cr= eate a spout that takes a file offset as an input and wire a lot more stuff= than I have to; so I was trying to find a way to perform what I told you i= n an elegant and scalable fashion...so far I have found nil.


On Thu, Jul 17, 2014 at 2:57 AM, M= ichael Rose <michael@fullcontact.com> wrote:
It doesn't say so, but if you have 4 workers, the 4 ex= ecutors will be shared evenly over the 4 workers. Likewise, 16 will partiti= on 4 each. The only case where a worker will not get a specific executor is= when there are less executors than workers (e.g. 8 workers, 4 executors), = 4 of the workers will receive an executor but the others will not.

It sounds like for your case, shuffle+parallelism is more th= an sufficient.

Michael Rose (@= Xorlev)
Senior Platform Engineer,=C2=A0FullContact
michael@fullcontact.com<= /p>



On Wed, Jul= 16, 2014 at 5:53 PM, Andrew Xor <andreas.grammenos@gmail.com> wrote:
Hey Stephen, Michael,

here the documentation says exec= utors are threads and it does not explicitly say anywhere that threads are = spawned across different nodes of the cluster... I want to avoid the possib= ility of these threads only spawning locally and not in a distributed fashi= on among the cluster nodes..

Andrew.


On Thu, Jul 1= 7, 2014 at 2:46 AM, Michael Rose <michael@fullcontact.com> wrote:
Maybe we can help= with your topology design if you let us know what you're doing that re= quires you to shuffle half of the whole stream output to each of the two di= fferent types of bolts.

If bolt b1 and bolt b2 are both instances of ExampleBolt (an= d not two different types) as above, there's no point to doing this. Se= tting the parallelism will make sure that data is partitioned across machin= es (by default, setting parallelism sets tasks =3D executors =3D parallelis= m).

Unfortunately, I don't know of any way to do this = other than shuffling the output to a new bolt, e.g. bolt "b0" a &= #39;RouterBolt', then having bolt b0 round-robin the received tuples be= tween two streams, then have b1 and b2 shuffle over those streams instead.<= div>


Michael Rose (@= Xorlev)
Senior Platform Engineer,=C2=A0FullContact
michael@fullcontact.com<= /p>



On Wed, Jul 16, 2014 at 5:40 PM, Andrew = Xor <andreas.grammenos@gmail.com> wrote:
=E2=80=8B
Hi Tomas,

=C2=A0As I said in my previous mail the grouping is for a bolt *task* not for=20 the actual number of spawned bolts; for example let's say you have two = bolts that=20 have a parallelism hint of 3 and these two bolts are wired to the same=20 spout. If you set the bolts as such:

tb.setBolt("b1", new = ExampleBolt(), 2 /* p-hint */).shuffleGrouping("spout1");
tb.s= etBolt("b2", new ExampleBolt(), 2 /* p-hint */).shuffleGrouping(&= quot;spout1");

Then each of the tasks will receive half of the spout tuples but each actual spawned bolt will receive all of the tuples emitted from the spout.=20 This is more evident if you set up a counter in the bolt counting how=20 many tuples if has received and testing this with no parallelism hint as such:

tb.setBolt("b1", new ExampleBolt(),).shuffleGroupin= g("spout1");
tb.setBolt("b2", new ExampleBolt()).shu= ffleGrouping("spout1");

Now you will see that both bolts will receive all tuples emitted by spout1.=

Hope this helps.

=E2=80=8B
=E2=80=8BAndrew.=E2=80=8B


On Thu, Jul 17, 2014 at 2:33 AM, Tomas Mazukna <t= omas.mazukna@gmail.com> wrote:
Andrew,<= div>
when you connect your bolt to your spout you specify the= grouping. If you use shuffle grouping then any free bolt gets the tuple - = in my experience even in lightly loaded topologies the distribution amongst= bolts is pretty even. If you use all grouping then all bolts receive a cop= y of the tuple.=C2=A0
Use shuffle grouping and each of your bolts will get about 1/3 of the = workload.

Tomas


On Wed, Jul 16, 2014 at 7:= 05 PM, Andrew Xor <andreas.grammenos@gmail.com> wr= ote:
H
=E2=80=8Bi,

=C2=A0I am trying to distribute the spout output to it's subscribed bol= ts evenly; let's say that I have a spout that emits tuples and three bo= lts that are subscribed to it. I want each of the three bolts to receive 1/= 3 rth of the output (or emit a tuple to each one of these bolts in turns). = Unfortunately as far as I understand all bolts will receive all of the emit= ted tuples of that particular spout regardless of the grouping defined (as = grouping from my understanding is for bolt *tasks* not actual bolts).

=C2=A0I've searched a bit and I ca= n't seem to find a way to accomplish that...=E2=80=8B is there a way to= do that or I am searching in vain?

Thanks.



--
Tomas Mazukna
678-557-3834





--001a1135206ed6928504fe58788a--