flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mike Percy <mpe...@cloudera.com>
Subject Re: Running Flume-NG in multiple threads
Date Thu, 28 Jun 2012 19:53:10 GMT
Great! 


On Thursday, June 28, 2012 at 11:23 AM, Tejinder Aulakh wrote:

> Thanks Mike. Adding more sinks seems to have helped from what I'm noticing right now.

> 
> Tejinder
> 
> On Wed, Jun 27, 2012 at 12:39 AM, Mike Percy <mpercy@cloudera.com (mailto:mpercy@cloudera.com)>
wrote:
> > Maybe your parsing is taking some time?
> > 
> > Try adding multiple sinks or sink groups to the same channel on the app tier to
get more throughput.
> > 
> > Regards,
> > Mike
> > 
> > 
> > On Tuesday, June 26, 2012 at 10:14 PM, Tejinder Aulakh wrote:
> > 
> > > Hi Mike,
> > > 
> > > Below is the config for agents and collectors.
> > > 
> > > CassandraSinkST2 - puts the events in Cassandra
> > > AvroSinkParserST2 - customized avro sink which parses the events before sending.
> > > 
> > > So you are saying if we simply add more sinks (CassandraSinkST2 on collectors)
and connect them to the same channel, it would speed things up?
> > > 
> > > -Tejinder
> > > 
> > > Agent
> > > -------
> > > #ch source, channel, and sink to use
> > > agent.channels = myMemoryChannel
> > > agent.sources = myExecSource
> > > agent.sinks = collector1002sink collector1003sink stFileSink
> > > agent.sinkgroups = collectorGroup
> > > 
> > > # Define a memory channel called myMemoryChannel
> > > agent.channels.myMemoryChannel.type = memory
> > > agent.channels.myMemoryChannel.capacity = 1000000
> > > agent.channels.myMemoryChannel.transactionCapacity = 10000
> > > agent.channels.myMemoryChannel.keep-alive = 30
> > > 
> > > # Define an exec source called myExecChannel to tail log file
> > > agent.sources.myExecSource.channels = myMemoryChannel
> > > agent.sources.myExecSource.type = exec
> > > agent.sources.myExecSource.command = tail -F /mnt/nginx/r.log
> > > 
> > > # Define a custom avro sink called collector1003sink
> > > agent.sinks.collector1003sink.channel = myMemoryChannel
> > > agent.sinks.collector1003sink.type = com.sharethis.web.flume.AvroSinkParserST2
(http://web.flume.AvroSinkParserST2) (http://web.flume.AvroSinkParserST2)
> > > agent.sinks.collector1003sink.hostname = {PRIMARY_IP}
> > > agent.sinks.collector1003sink.port = 45678
> > > agent.sinks.collector1003sink.batch-size = 100
> > > 
> > > # Define a custom avro sink called collector1002sink
> > > agent.sinks.collector1002sink.channel = myMemoryChannel
> > > agent.sinks.collector1002sink.type = com.sharethis.web.flume.AvroSinkParserST2
(http://web.flume.AvroSinkParserST2) (http://web.flume.AvroSinkParserST2)
> > > agent.sinks.collector1002sink.hostname = {BACKUP_IP}
> > > agent.sinks.collector1002sink.port = 45678
> > > agent.sinks.collector1002sink.batch-size = 100
> > > 
> > > # logger sink called stFileSink
> > > agent.sinks.stFileSink.channel = myMemoryChannel
> > > agent.sinks.stFileSink.type = file_roll
> > > agent.sinks.stFileSink.sink.directory = /var/tmp/unprocessed_events/
> > > agent.sinks.stFileSink.type.sink.rollInterval = 1800
> > > 
> > > # configure sinkgroup agentGroup. sinks with higher priorities are run first
> > > agent.sinkgroups.collectorGroup.sinks = collector1003sink collector1002sink
stFileSink
> > > agent.sinkgroups.collectorGroup.processor.type = failover
> > > agent.sinkgroups.collectorGroup.processor.priority.collector1003sink = 10
> > > agent.sinkgroups.collectorGroup.processor.priority.collector1002sink = 15
> > > agent.sinkgroups.collectorGroup.processor.priority.stFileSink = 5
> > > agent.sinkgroups.collectorGroup.processor.maxpenalty = 10000
> > > 
> > > 
> > > Collector
> > > -------------
> > > # Tell Flume which source, channel, and sink to use
> > > collector.channels = collectorMemoryChannel
> > > collector.sources = collectorAvroSource
> > > collector.sinks = collectorCassandraSink collectorFile
> > > collector.sinkgroups = collectorGroup
> > > 
> > > # Define a memory channel called collectorMemoryChannel
> > > collector.channels.collectorMemoryChannel.type = memory
> > > collector.channels.collectorMemoryChannel.capacity = 10000000
> > > collector.channels.collectorMemoryChannel.transactionCapacity = 100000
> > > collector.channels.collectorMemoryChannel.keep-alive = 30
> > > 
> > > # Define an exec source called collectorAvroSource
> > > collector.sources.collectorAvroSource.channels = collectorMemoryChannel
> > > collector.sources.collectorAvroSource.type = avro
> > > collector.sources.collectorAvroSource.bind = 0.0.0.0
> > > collector.sources.collectorAvroSource.port = 45678
> > > 
> > > # Define a custom sink called collectorCustomSink
> > > collector.sinks.collectorCassandraSink.channel = collectorMemoryChannel
> > > collector.sinks.collectorCassandraSink.type = com.sharethis.web.flume.CassandraSinkST2
(http://web.flume.CassandraSinkST2) (http://web.flume.CassandraSinkST2)
> > > collector.sinks.collectorCassandraSink.cassandraHostname = {CASSANDRA_HOSTNAME}
> > > collector.sinks.collectorCassandraSink.cassandraPort = 9160
> > > 
> > > # logger sink called collectorFile
> > > collector.sinks.collectorFile.channel = collectorMemoryChannel
> > > collector.sinks.collectorFile.type = file_roll
> > > collector.sinks.collectorFile.sink.directory = /var/tmp/unprocessed_events/
> > > collector.sinks.collectorFile.type.sink.rollInterval = 1800
> > > 
> > > # configure sinkgroup collectorGroup. sinks with higher priorities are run
first
> > > collector.sinkgroups.collectorGroup.sinks = collectorCassandraSink collectorFile
> > > collector.sinkgroups.collectorGroup.processor.type = failover
> > > collector.sinkgroups.collectorGroup.processor.priority.collectorCassandraSink
= 15
> > > collector.sinkgroups.collectorGroup.processor.priority.collectorFile = 5
> > > collector.sinkgroups.collectorGroup.processor.maxpenalty = 10000
> > > 
> > > 
> > > On Tue, Jun 26, 2012 at 7:01 PM, Mike Percy <mpercy@cloudera.com (mailto:mpercy@cloudera.com)
(mailto:mpercy@cloudera.com)> wrote:
> > > > We are able to push > 8000 events/sec (2KB per event) through a single
file channel if you put checkpoint on one disk and use 2 other disks for data dirs. Not sure
what the limit is. This is using the latest trunk code. Other limitations may be you need
to add additional sinks to your channel to drain it faster. This is because sinks are single
threaded and sources are multithreaded.
> > > > 
> > > > Mike
> > > > 
> > > > 
> > > > On Tuesday, June 26, 2012 at 6:30 PM, Juhani Connolly wrote:
> > > > 
> > > > > Depending on your setup, you may find that some channels just cannot
> > > > > keep up with throughput. Is the timestamp on the logs gradually falling
> > > > > further behind the time it appears on the collector? If so, some
> > > > > component(likely the channels) are turning into a bottleneck.
> > > > > 
> > > > > We have about 1000 events/sec running through to our collector and
using
> > > > > file channel it unfortunately could not keep up. If you're not already,
> > > > > try running memory channels and see how that works.
> > > > > 
> > > > > On 06/27/2012 08:06 AM, Tejinder Aulakh wrote:
> > > > > > We are using Flume-NG to transfer logs from the log servers
(40) to
> > > > > > collectors (4). However, now we are seeing a long delay before
the
> > > > > > logs show up at the collectors.
> > > > > > 
> > > > > > I was wondering if there is any way of running Flume-NG using
Multiple
> > > > > > threads so that collectors can process all the load they get
form the
> > > > > > log servers. Is it configurable in flume-env.sh (http://flume-env.sh)
(http://flume-env.sh) (http://flume-env.sh)? How many threads does
> > > > > > Flume-NG use by default?
> > > > > > 
> > > > > > Thanks,
> > > > > > Tejinder
> > > > > 
> > > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > --
> > > Tejinder Aulakh
> > > Senior Software Engineer, ShareThis
> > > e: tejinder@sharethis.com (mailto:tejinder@sharethis.com) (mailto:tejinder@sharethis.com)
> > > m: 510.708.2499 (tel:510.708.2499)
> > > 
> > > (http://pinterest.com/sharethis/)
> > > Learn More: SQI (Social Quality Index) - A Universal Measure of Social Quality
(http://sharethis.com/sqi)
> > 
> 
> 
> 
> 
> -- 
> Tejinder Aulakh
> Senior Software Engineer, ShareThis
> e: tejinder@sharethis.com (mailto:tejinder@sharethis.com)
> m: 510.708.2499
> 
> (http://pinterest.com/sharethis/)
> Learn More: SQI (Social Quality Index) - A Universal Measure of Social Quality (http://sharethis.com/sqi)

> 
> 
> 
> 



Mime
View raw message