flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tejinder Aulakh <tejin...@sharethis.com>
Subject Re: Running Flume-NG in multiple threads
Date Thu, 28 Jun 2012 18:23:23 GMT
Thanks Mike. Adding more sinks seems to have helped from what I'm noticing
right now.

Tejinder

On Wed, Jun 27, 2012 at 12:39 AM, Mike Percy <mpercy@cloudera.com> wrote:

> Maybe your parsing is taking some time?
>
> Try adding multiple sinks or sink groups to the same channel on the app
> tier to get more throughput.
>
> Regards,
> Mike
>
>
> On Tuesday, June 26, 2012 at 10:14 PM, Tejinder Aulakh wrote:
>
> > Hi Mike,
> >
> > Below is the config for agents and collectors.
> >
> > CassandraSinkST2 - puts the events in Cassandra
> > AvroSinkParserST2 - customized avro sink which parses the events before
> sending.
> >
> > So you are saying if we simply add more sinks (CassandraSinkST2 on
> collectors) and connect them to the same channel, it would speed things up?
> >
> > -Tejinder
> >
> > Agent
> > -------
> > #ch source, channel, and sink to use
> > agent.channels = myMemoryChannel
> > agent.sources = myExecSource
> > agent.sinks = collector1002sink collector1003sink stFileSink
> > agent.sinkgroups = collectorGroup
> >
> > # Define a memory channel called myMemoryChannel
> > agent.channels.myMemoryChannel.type = memory
> > agent.channels.myMemoryChannel.capacity = 1000000
> > agent.channels.myMemoryChannel.transactionCapacity = 10000
> > agent.channels.myMemoryChannel.keep-alive = 30
> >
> > # Define an exec source called myExecChannel to tail log file
> > agent.sources.myExecSource.channels = myMemoryChannel
> > agent.sources.myExecSource.type = exec
> > agent.sources.myExecSource.command = tail -F /mnt/nginx/r.log
> >
> > # Define a custom avro sink called collector1003sink
> > agent.sinks.collector1003sink.channel = myMemoryChannel
> > agent.sinks.collector1003sink.type =
> com.sharethis.web.flume.AvroSinkParserST2 (
> http://web.flume.AvroSinkParserST2)
> > agent.sinks.collector1003sink.hostname = {PRIMARY_IP}
> > agent.sinks.collector1003sink.port = 45678
> > agent.sinks.collector1003sink.batch-size = 100
> >
> > # Define a custom avro sink called collector1002sink
> > agent.sinks.collector1002sink.channel = myMemoryChannel
> > agent.sinks.collector1002sink.type =
> com.sharethis.web.flume.AvroSinkParserST2 (
> http://web.flume.AvroSinkParserST2)
> > agent.sinks.collector1002sink.hostname = {BACKUP_IP}
> > agent.sinks.collector1002sink.port = 45678
> > agent.sinks.collector1002sink.batch-size = 100
> >
> > # logger sink called stFileSink
> > agent.sinks.stFileSink.channel = myMemoryChannel
> > agent.sinks.stFileSink.type = file_roll
> > agent.sinks.stFileSink.sink.directory = /var/tmp/unprocessed_events/
> > agent.sinks.stFileSink.type.sink.rollInterval = 1800
> >
> > # configure sinkgroup agentGroup. sinks with higher priorities are run
> first
> > agent.sinkgroups.collectorGroup.sinks = collector1003sink
> collector1002sink stFileSink
> > agent.sinkgroups.collectorGroup.processor.type = failover
> > agent.sinkgroups.collectorGroup.processor.priority.collector1003sink = 10
> > agent.sinkgroups.collectorGroup.processor.priority.collector1002sink = 15
> > agent.sinkgroups.collectorGroup.processor.priority.stFileSink = 5
> > agent.sinkgroups.collectorGroup.processor.maxpenalty = 10000
> >
> >
> > Collector
> > -------------
> > # Tell Flume which source, channel, and sink to use
> > collector.channels = collectorMemoryChannel
> > collector.sources = collectorAvroSource
> > collector.sinks = collectorCassandraSink collectorFile
> > collector.sinkgroups = collectorGroup
> >
> > # Define a memory channel called collectorMemoryChannel
> > collector.channels.collectorMemoryChannel.type = memory
> > collector.channels.collectorMemoryChannel.capacity = 10000000
> > collector.channels.collectorMemoryChannel.transactionCapacity = 100000
> > collector.channels.collectorMemoryChannel.keep-alive = 30
> >
> > # Define an exec source called collectorAvroSource
> > collector.sources.collectorAvroSource.channels = collectorMemoryChannel
> > collector.sources.collectorAvroSource.type = avro
> > collector.sources.collectorAvroSource.bind = 0.0.0.0
> > collector.sources.collectorAvroSource.port = 45678
> >
> > # Define a custom sink called collectorCustomSink
> > collector.sinks.collectorCassandraSink.channel = collectorMemoryChannel
> > collector.sinks.collectorCassandraSink.type =
> com.sharethis.web.flume.CassandraSinkST2 (
> http://web.flume.CassandraSinkST2)
> > collector.sinks.collectorCassandraSink.cassandraHostname =
> {CASSANDRA_HOSTNAME}
> > collector.sinks.collectorCassandraSink.cassandraPort = 9160
> >
> > # logger sink called collectorFile
> > collector.sinks.collectorFile.channel = collectorMemoryChannel
> > collector.sinks.collectorFile.type = file_roll
> > collector.sinks.collectorFile.sink.directory =
> /var/tmp/unprocessed_events/
> > collector.sinks.collectorFile.type.sink.rollInterval = 1800
> >
> > # configure sinkgroup collectorGroup. sinks with higher priorities are
> run first
> > collector.sinkgroups.collectorGroup.sinks = collectorCassandraSink
> collectorFile
> > collector.sinkgroups.collectorGroup.processor.type = failover
> >
> collector.sinkgroups.collectorGroup.processor.priority.collectorCassandraSink
> = 15
> > collector.sinkgroups.collectorGroup.processor.priority.collectorFile = 5
> > collector.sinkgroups.collectorGroup.processor.maxpenalty = 10000
> >
> >
> > On Tue, Jun 26, 2012 at 7:01 PM, Mike Percy <mpercy@cloudera.com(mailto:
> mpercy@cloudera.com)> wrote:
> > > We are able to push > 8000 events/sec (2KB per event) through a single
> file channel if you put checkpoint on one disk and use 2 other disks for
> data dirs. Not sure what the limit is. This is using the latest trunk code.
> Other limitations may be you need to add additional sinks to your channel
> to drain it faster. This is because sinks are single threaded and sources
> are multithreaded.
> > >
> > > Mike
> > >
> > >
> > > On Tuesday, June 26, 2012 at 6:30 PM, Juhani Connolly wrote:
> > >
> > > > Depending on your setup, you may find that some channels just cannot
> > > > keep up with throughput. Is the timestamp on the logs gradually
> falling
> > > > further behind the time it appears on the collector? If so, some
> > > > component(likely the channels) are turning into a bottleneck.
> > > >
> > > > We have about 1000 events/sec running through to our collector and
> using
> > > > file channel it unfortunately could not keep up. If you're not
> already,
> > > > try running memory channels and see how that works.
> > > >
> > > > On 06/27/2012 08:06 AM, Tejinder Aulakh wrote:
> > > > > We are using Flume-NG to transfer logs from the log servers (40)
to
> > > > > collectors (4). However, now we are seeing a long delay before the
> > > > > logs show up at the collectors.
> > > > >
> > > > > I was wondering if there is any way of running Flume-NG using
> Multiple
> > > > > threads so that collectors can process all the load they get form
> the
> > > > > log servers. Is it configurable in flume-env.sh (
> http://flume-env.sh) (http://flume-env.sh)? How many threads does
> > > > > Flume-NG use by default?
> > > > >
> > > > > Thanks,
> > > > > Tejinder
> > > >
> > >
> >
> >
> >
> >
> > --
> > Tejinder Aulakh
> > Senior Software Engineer, ShareThis
> > e: tejinder@sharethis.com (mailto:tejinder@sharethis.com)
> > m: 510.708.2499
> >
> > (http://pinterest.com/sharethis/)
> > Learn More: SQI (Social Quality Index) - A Universal Measure of Social
> Quality (http://sharethis.com/sqi)
> >
> >
> >
> >
>
>
>


-- 
*Tejinder Aulakh
*Senior Software Engineer, ShareThis
*e:* tejinder@sharethis.com
*m:* 510.708.2499*
*
** <http://pinterest.com/sharethis/>* <http://sharethis.com/>
**Learn More:*  SQI (Social Quality Index) - A Universal Measure of Social
Quality <http://sharethis.com/sqi>

[image: Facebook] <http://www.facebook.com/sharethis> [image:
Twitter]<https://twitter.com/#!/SHARETHIS>
 [image: LinkedIn]<http://www.linkedin.com/company/207839?trk=pro_other_cmpy>
 [image: pinterest] <http://pinterest.com/sharethis/>

Mime
View raw message