storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tomas Mazukna <tomas.mazu...@gmail.com>
Subject Re: Distribute Spout output among all bolts
Date Thu, 17 Jul 2014 01:21:23 GMT
Kafka client handles that, it is stored in zookeeper with the offset.
I wrote a kafka spout based on kafka groups consumer api. Kafka allows only
one consumer per partition per group.


On Wed, Jul 16, 2014 at 8:41 PM, Andrew Xor <andreas.grammenos@gmail.com>
wrote:

> Ok, but upon runtime how to you set in the spout which kafka partition to
> subscribe at?
>
> Kindly yours,
>
> Andrew Grammenos
>
> -- PGP PKey --
> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>
>
> On Thu, Jul 17, 2014 at 3:30 AM, Tomas Mazukna <tomas.mazukna@gmail.com>
> wrote:
>
>> So you want to define only one instance of the spout that reads the file.
>> Number of bolts will only depend on how fast you need to process the data.
>> I have a topology that has a spout with parallelism of 40 - connected to
>> 40 partitions of a kafka topic. It send traffic to the first bolt which has
>> parallelism 320. The whole topology is split up into 4 workers. that makes
>> 10 spout instances in each jvm, feeding 80 bolts. In my case I have
>> grouping so tuples get routed to different physical machines.
>>
>> Tomas
>>
>>
>> On Wed, Jul 16, 2014 at 8:10 PM, Andrew Xor <andreas.grammenos@gmail.com>
>> wrote:
>>
>>>  Michael,
>>>
>>> ​ Thanks for the response but I think another problem arises; as ​I just
>>> cooked up a small example the increased number of workers only spawns
>>> mirrors of the topology. This poses a problem for me due to the fact that
>>> my spout reads from a very big file and converts each line into a tuple and
>>> feeds that in the topology. What I wanted to do in the first place is to
>>> actually send each tuple produced to a different subscribed bolt each time
>>> (using Round Robing or smth) so that each one of them got 1/n nth (where n
>>> the number of bolts) of the input stream. If I spawn 2 workers both will
>>> read the same file and emit the same tuples so both topology workers will
>>> produce the same results.
>>>
>>>  I wanted to avoid to create a spout that takes a file offset as an
>>> input and wire a lot more stuff than I have to; so I was trying to find a
>>> way to perform what I told you in an elegant and scalable fashion...so far
>>> I have found nil.
>>>
>>>
>>> On Thu, Jul 17, 2014 at 2:57 AM, Michael Rose <michael@fullcontact.com>
>>> wrote:
>>>
>>>> It doesn't say so, but if you have 4 workers, the 4 executors will be
>>>> shared evenly over the 4 workers. Likewise, 16 will partition 4 each. The
>>>> only case where a worker will not get a specific executor is when there are
>>>> less executors than workers (e.g. 8 workers, 4 executors), 4 of the workers
>>>> will receive an executor but the others will not.
>>>>
>>>> It sounds like for your case, shuffle+parallelism is more than
>>>> sufficient.
>>>>
>>>> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>>>> michael@fullcontact.com
>>>>
>>>>
>>>> On Wed, Jul 16, 2014 at 5:53 PM, Andrew Xor <
>>>> andreas.grammenos@gmail.com> wrote:
>>>>
>>>>> Hey Stephen, Michael,
>>>>>
>>>>>  Yea I feared as much... as searching the docs and API did not surface
>>>>> any reliable and elegant way of doing that unless you had a "RouterBolt".
>>>>> If setting the parallelism of a component is enough for load balancing
the
>>>>> processes across different machines that are part of the Storm cluster
then
>>>>> this would suffice in my use case. Although here
>>>>> <https://storm.incubator.apache.org/documentation/Understanding-the-parallelism-of-a-Storm-topology.html>
>>>>> the documentation says executors are threads and it does not explicitly
say
>>>>> anywhere that threads are spawned across different nodes of the cluster...
>>>>> I want to avoid the possibility of these threads only spawning locally
and
>>>>> not in a distributed fashion among the cluster nodes..
>>>>>
>>>>> Andrew.
>>>>>
>>>>>
>>>>> On Thu, Jul 17, 2014 at 2:46 AM, Michael Rose <michael@fullcontact.com
>>>>> > wrote:
>>>>>
>>>>>> Maybe we can help with your topology design if you let us know what
>>>>>> you're doing that requires you to shuffle half of the whole stream
output
>>>>>> to each of the two different types of bolts.
>>>>>>
>>>>>> If bolt b1 and bolt b2 are both instances of ExampleBolt (and not
two
>>>>>> different types) as above, there's no point to doing this. Setting
the
>>>>>> parallelism will make sure that data is partitioned across machines
(by
>>>>>> default, setting parallelism sets tasks = executors = parallelism).
>>>>>>
>>>>>> Unfortunately, I don't know of any way to do this other than
>>>>>> shuffling the output to a new bolt, e.g. bolt "b0" a 'RouterBolt',
then
>>>>>> having bolt b0 round-robin the received tuples between two streams,
then
>>>>>> have b1 and b2 shuffle over those streams instead.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>>>>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>>>>>> michael@fullcontact.com
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 16, 2014 at 5:40 PM, Andrew Xor <
>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>
>>>>>>> ​
>>>>>>> Hi Tomas,
>>>>>>>
>>>>>>>  As I said in my previous mail the grouping is for a bolt *task*
not
>>>>>>> for the actual number of spawned bolts; for example let's say
you have two
>>>>>>> bolts that have a parallelism hint of 3 and these two bolts are
wired to
>>>>>>> the same spout. If you set the bolts as such:
>>>>>>>
>>>>>>> tb.setBolt("b1", new ExampleBolt(), 2 /* p-hint
>>>>>>> */).shuffleGrouping("spout1");
>>>>>>> tb.setBolt("b2", new ExampleBolt(), 2 /* p-hint
>>>>>>> */).shuffleGrouping("spout1");
>>>>>>>
>>>>>>> Then each of the tasks will receive half of the spout tuples
but
>>>>>>> each actual spawned bolt will receive all of the tuples emitted
from the
>>>>>>> spout. This is more evident if you set up a counter in the bolt
counting
>>>>>>> how many tuples if has received and testing this with no parallelism
hint
>>>>>>> as such:
>>>>>>>
>>>>>>> tb.setBolt("b1", new ExampleBolt(),).shuffleGrouping("spout1");
>>>>>>> tb.setBolt("b2", new ExampleBolt()).shuffleGrouping("spout1");
>>>>>>>
>>>>>>> Now you will see that both bolts will receive all tuples emitted
by
>>>>>>> spout1.
>>>>>>>
>>>>>>> Hope this helps.
>>>>>>>
>>>>>>> ​
>>>>>>> ​Andrew.​
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 17, 2014 at 2:33 AM, Tomas Mazukna <
>>>>>>> tomas.mazukna@gmail.com> wrote:
>>>>>>>
>>>>>>>> Andrew,
>>>>>>>>
>>>>>>>> when you connect your bolt to your spout you specify the
grouping.
>>>>>>>> If you use shuffle grouping then any free bolt gets the tuple
- in my
>>>>>>>> experience even in lightly loaded topologies the distribution
amongst bolts
>>>>>>>> is pretty even. If you use all grouping then all bolts receive
a copy of
>>>>>>>> the tuple.
>>>>>>>> Use shuffle grouping and each of your bolts will get about
1/3 of
>>>>>>>> the workload.
>>>>>>>>
>>>>>>>> Tomas
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 16, 2014 at 7:05 PM, Andrew Xor <
>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> H
>>>>>>>>> ​i,
>>>>>>>>>
>>>>>>>>>  I am trying to distribute the spout output to it's subscribed
>>>>>>>>> bolts evenly; let's say that I have a spout that emits
tuples and three
>>>>>>>>> bolts that are subscribed to it. I want each of the three
bolts to receive
>>>>>>>>> 1/3 rth of the output (or emit a tuple to each one of
these bolts in
>>>>>>>>> turns). Unfortunately as far as I understand all bolts
will receive all of
>>>>>>>>> the emitted tuples of that particular spout regardless
of the grouping
>>>>>>>>> defined (as grouping from my understanding is for bolt
*tasks* not actual
>>>>>>>>> bolts).
>>>>>>>>>
>>>>>>>>>  I've searched a bit and I can't seem to find a way to
accomplish
>>>>>>>>> that...​ is there a way to do that or I am searching
in vain?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Tomas Mazukna
>>>>>>>> 678-557-3834
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Tomas Mazukna
>> 678-557-3834
>>
>
>


-- 
Tomas Mazukna
678-557-3834

Mime
View raw message