storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cody A. Ray" <cody.a....@gmail.com>
Subject Re: performance issues in time-series aggregation with Storm/Trident
Date Tue, 08 Jul 2014 20:48:09 GMT
How many Kafka partitions are you using with a 10-20MB fetch size? (I
believe that batch size ~= # partitions * kafka fetch size).

I added the initial shuffle after the spout so that I could have a much
higher parallelism for the main transform stage (CPU-bound) than the spout
stage (IO-bound). Is this not the right approach? Without this, they have
the same parallelism. Though localOrShuffle might help here then?

Yes, we monitor CPU and memory (but not network usage). CPU loads are
fairly spiky (presumably spiking during the transform stage) but we
generally have 80+% idle. All storm boxes have 4-6GB memory free. Although
we don't have solid network usage monitoring, some simple iperf tests have
shown that we can transfer 1G in a few seconds, IIRC.

Will run a few tests with maxSpoutPending and see how that affects our
throughput.

-Cody


On Tue, Jul 8, 2014 at 1:23 PM, Danijel Schiavuzzi <danijel@schiavuzzi.com>
wrote:

> You should increase maxSpoutPending, as it enables pipelining of batches
> inside the topology, thus increasing performance. As for the Kafka fetch
> size, I've got the best throughput with 10-20MB sizes. The buffer size
> didn't impact the throughput.
>
> Also, I believe the first shuffle() repartitioning operation, right after
> the spout, is not needed, as it will only slow down processing by
> transferring tuples between workers over the transport layer and the
> network needlessly. It's best to avoid repartitioning operations and let
> Trident pack the topology into as few bolts as possible. I've got a massive
> speedup by removing unnecessary stream repartitionings (shuffle()s). In my
> case, the overhead of repartitioning the stream (to enable increasing the
> various tasks paralellism) was greater than the gain of
> the increased paralellism of the tasks. Increasing task paralellism doesn't
> always equal increased performance, it depends of course on the specific
> case.
>
> Do you monitor the basic system stats like CPU, memory and network usage?
>
> On Monday, July 7, 2014, Cody A. Ray <cody.a.ray@gmail.com> wrote:
>
>> Hi,
>>
>> We're doing time-series event aggregation with Storm/Trident, which I
>> believe is a pretty common use case. However, we're running into
>> performance issues when trying to transition our high-volume applications
>> to it (up to 600k req/min peak).
>>
>> Source code: https://gist.github.com/codyaray/75533044fc8c0a12fa67
>> (Can't easily push a working branch to github since the JSON parser is in
>> a private library.)
>>
>> Component Versions:
>> - Kafka 0.8.0
>> - Storm 0.9.0.1
>> - Kafka Spout 0.2.0 from wurstmeister
>> <https://github.com/wurstmeister/storm-kafka-0.8-plus>
>>
>> Hardware in EC2:
>> - 3 zookeepers on c1.mediums
>> - 2 kafkas on m1.larges
>> - 3 storms on m1.larges and c3.xlarges (in another region)
>> - 2 mongos on m1.large and m1.xlarges (in another region)
>>
>> Topology:
>> - spout0 bolt (from Trident) with parallelism 2 (one per Kafka partition)
>> - Transform bolt with parallelism 12 (= 3 worker hosts * 2 cores/host * 2
>> executors/30core)
>> - 30s and 30m Aggregator bolts with parallelism 3 (one per worker host)
>>
>> Tuning Options:
>> - Kafka Max Fetch Size: have tried various options from 256K to 20M
>> - Kafka Buffer Size: have tried default (1M) as well as matching to Max
>> Fetch Size
>> - Max Spout Parallelism: left default of 1
>>
>> If you look at the Storm UI, you'll notice that none of the bolts are
>> over-capacity (>= 1). The highest capacity bolt is the Aggregator-30s which
>> performs the persistentAggregate.
>>
>> [image: Inline image 3]
>> (A larger full-screen snapshot of this UI view is attached.)
>>
>> But diving into this bolt shows that one of the 3 executors is almost
>> unused, one is medium-used, and the other is heavily-used. So something may
>> not be balanced well???
>>
>> [image: Inline image 1]
>> (A larger full-screen snapshot of this UI view is attached.)
>>
>> We've used the Nimbus Thrift API to put together a Graphite dashboard
>> showing (what we hope are) the key metrics for this pipeline.
>>
>> [image: Inline image 2]
>> (See attached for a bigger version of this graph).
>>
>> These graphs show rates as request messages (and later tuples) move
>> through the system. The following list gives the Graphite targets shown in
>> each graph.
>>
>>    - Suro Send - number of messages coming from each Suro into Kafka
>>       - prod.suro.*mongo*.messages
>>    - Kafka Ingestion - number of messages coming into each Kafka
>>    Broker-Partition
>>       - prod.kafka.kafka*.requests.*.latest
>>    - Suro/Kafka/Storm Pipeline - total number of messages going through
>>    Suro/Kafka/Storm
>>       - alias(sum(prod.suro.*mongo*.messages),%22suro%20messages%22)
>>       -
>>       alias(sum(prod.kafka.kafka01*.requests.*.latest),%22kafka01%20latest%22)
>>       -
>>       alias(sum(prod.kafka.kafka02*.requests.*.latest),%22kafka02%20latest%22)
>>       -
>>       alias(sum(prod.storm.storm*.*.stats15.spout0.*.s1.emit.count),%22storm%20spout%22)
>>    - Kafka Spout Emitter - number of Messages Per Batch and Batch Count:
>>       -
>>       alias(divideSeries(sum(prod.storm.storm*.*.stats15.spout0.*.s1.emit.count),sum(prod.storm.storm*.6*.stats15.mastercoord-bg0.*.batch.emit.count)),%22Messages%20Per%20Batch%22)
>>       -
>>       alias(secondYAxis(prod.storm.storm*.6*.stats15.mastercoord-bg0.*.batch.emit.count),%22Batch%20Count%22)
>>    - Spout Output vs Transform Input:
>>       -
>>       alias(sum(prod.storm.storm*.*.stats15.spout0.*.s1.emit.count),%22Spout%20Output%22)
>>       -
>>       alias(sum(prod.storm.storm01*.*.stats15.Transform-Aggregator-30s-Transform-Aggregator-30m.*.spout0.s1.execute.count),%22Transform%20Executes%22)
>>       -
>>       alias(sum(prod.storm.storm01*.*.stats15.Transform-Aggregator-30s-Transform-Aggregator-30m.*.spout0.s1.ack.count),%22Transform%20Acks%22)
>>    - Transform Output vs Aggregator Input:
>>       -
>>       alias(sum(prod.storm.storm01*.*.stats15.Transform-Aggregator-30s-Transform-Aggregator-30m.*.s13.emit.count),%22Transform-30m%20Output%22)
>>       -
>>       alias(sum(prod.storm.storm01*.6*.stats15.Aggregator-30m.*.*-Trans*.s*.execute.count),%22Aggregate-30m%20Executes%22)
>>       -
>>       alias(sum(prod.storm.storm01*.6*.stats15.Aggregator-30m.*.*-Trans*.s*.ack.count),%22Aggregate-30m%20Acks%22)
>>       -
>>       alias(sum(prod.storm.storm01*.*.stats15.Transform-Aggregator-30s-Transform-Aggregator-30m.*.s7.emit.count),%22Transform-30s%20Output%22)
>>       -
>>       alias(sum(prod.storm.storm01*.6*.stats15.Aggregator-30s.*.*-Trans*.s*.execute.count),%22Aggregator-30s%20Executes%22)
>>       -
>>       alias(sum(prod.storm.storm01*.6*.stats15.Aggregator-30s.*.*-Trans*.s*.ack.count),%22Aggregator-30s%20Acks%22)
>>    - Stats 1.5 Mongo Ops:
>>       -
>>       alias(sum(prod.mongo.mongo43*.counters.update,prod.mongo.samongo99*.counters.update),%22writes%22)
>>       -
>>       alias(sum(prod.mongo.mongo43*.counters.query,prod.mongo.samongo99*.counters.query),%22reads%22)
>>
>> I'm not positive I'm interpreting/calculating all of these correctly, in
>> particular, Messages Per Batch and Batch Count. Do these look right?
>>
>> As you can see in the above Transform Output vs Aggregate Input graph,
>> the number of tuples processed by the 30m and the 30s aggregators differs.
>> Since each input message is mapped to one 30m and one 30s message, I would
>> expect these numbers to be roughly the same. Am I making an invalid
>> assumption? What else could this be?
>>
>> Assuming the Kafka Spout Emitter graph is correct, you can see that there
>> are periods of time that we emit nothing. From changing the tuning
>> parameters (in particular, the Kafka maxFetchSize) and varying how much
>> backlog is available in Kafka, we've noticed that with a large backlog and
>> maxFetchSize that we'll see a large number of Messages Per Batch but many
>> fewer Batches/min (e.g., lower Batch Count). At worst, this became one
>> batch in an hour or more as Storm chugged away at the data.
>>
>> From the Mongo graph, you can see that we're not doing a ton of writes
>> per minute; around 75k or so. We've used Mongo quite extensively before and
>> seen much better write performance than this so we think its an artifact of
>> how we're interacting with it. (In fact, this is a transition topology
>> designed to move us off Mongo entirely.)
>>
>> So, in summary, we're having difficulty figuring out where our system is
>> bottlenecking at high-volumes.
>>
>> If you've made it this far, thanks for listening! Any help or insight or
>> ideas are appreciated. :)
>>
>> -Cody
>>
>> --
>> Cody A. Ray, LEED AP
>> cody.a.ray@gmail.com
>> 215.501.7891
>>
>
>
> --
> Danijel Schiavuzzi
>
> E: danijel@schiavuzzi.com
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>



-- 
Cody A. Ray, LEED AP
cody.a.ray@gmail.com
215.501.7891

Mime
View raw message