Return-Path: X-Original-To: apmail-incubator-flume-user-archive@minotaur.apache.org Delivered-To: apmail-incubator-flume-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 903AED668 for ; Thu, 28 Jun 2012 18:23:51 +0000 (UTC) Received: (qmail 98839 invoked by uid 500); 28 Jun 2012 18:23:51 -0000 Delivered-To: apmail-incubator-flume-user-archive@incubator.apache.org Received: (qmail 98790 invoked by uid 500); 28 Jun 2012 18:23:50 -0000 Mailing-List: contact flume-user-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: flume-user@incubator.apache.org Delivered-To: mailing list flume-user@incubator.apache.org Received: (qmail 98782 invoked by uid 99); 28 Jun 2012 18:23:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Jun 2012 18:23:50 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=FSL_RCVD_USER,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS,T_REMOTE_IMAGE X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of tejinder@sharethis.com designates 209.85.212.47 as permitted sender) Received: from [209.85.212.47] (HELO mail-vb0-f47.google.com) (209.85.212.47) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Jun 2012 18:23:45 +0000 Received: by vbbfr13 with SMTP id fr13so1765053vbb.6 for ; Thu, 28 Jun 2012 11:23:24 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type:x-gm-message-state; bh=xlhwLygv+lYMP6UVfpeqdtKCdZtzltCKNDgd8ePvMRk=; b=iIh+uLKHpk9qGxkHJRQ6JZu0wxHgAaF4e+nO1DRO7eMBylZMM0/oMErmxKpkw5kk1X Kzea87aSSOL6QvJ3mWxZ7unXqpRpp8GWaAMgZ79Qi6U8FS9PBOccJ/6GwPSAcdfin2wj mqw+qOauUmTqKqoHEpyQ0uJgtNI8/xbshHoJRnsb9GgxgwYdYlffSm9R2G+AKY2u43Cy ZN4a0B7ZftNmFEC1VlVLtl9pRK3Oq9HpMOXd2no4ii56OvECZnkyT7lMVK65yAK4s/Zl 7riUTzKBqUFmSQXFwIvuuHrwQGbQVTsFlhftKFDyGLAgqGpZsmfpMQ7HpiMMQFlVRxeW soPQ== MIME-Version: 1.0 Received: by 10.52.89.72 with SMTP id bm8mr2128986vdb.5.1340907803678; Thu, 28 Jun 2012 11:23:23 -0700 (PDT) Received: by 10.52.33.99 with HTTP; Thu, 28 Jun 2012 11:23:23 -0700 (PDT) In-Reply-To: <9EB6B50625EF418AA3ADA0107F22F2BE@cloudera.com> References: <4FEA6247.7000005@cyberagent.co.jp> <661635F211EF4B8F85FAE9C5927E9202@cloudera.com> <9EB6B50625EF418AA3ADA0107F22F2BE@cloudera.com> Date: Thu, 28 Jun 2012 11:23:23 -0700 Message-ID: Subject: Re: Running Flume-NG in multiple threads From: Tejinder Aulakh To: flume-user@incubator.apache.org Content-Type: multipart/alternative; boundary=20cf307f36925098ce04c38c6d2a X-Gm-Message-State: ALoCoQlonYO2h/wNKX9SqOfi08pLRrK16eCoR1iJkZWvfghDahpBdJ40IG6krgfrOgpDvH4yVBdH --20cf307f36925098ce04c38c6d2a Content-Type: text/plain; charset=ISO-8859-1 Thanks Mike. Adding more sinks seems to have helped from what I'm noticing right now. Tejinder On Wed, Jun 27, 2012 at 12:39 AM, Mike Percy wrote: > Maybe your parsing is taking some time? > > Try adding multiple sinks or sink groups to the same channel on the app > tier to get more throughput. > > Regards, > Mike > > > On Tuesday, June 26, 2012 at 10:14 PM, Tejinder Aulakh wrote: > > > Hi Mike, > > > > Below is the config for agents and collectors. > > > > CassandraSinkST2 - puts the events in Cassandra > > AvroSinkParserST2 - customized avro sink which parses the events before > sending. > > > > So you are saying if we simply add more sinks (CassandraSinkST2 on > collectors) and connect them to the same channel, it would speed things up? > > > > -Tejinder > > > > Agent > > ------- > > #ch source, channel, and sink to use > > agent.channels = myMemoryChannel > > agent.sources = myExecSource > > agent.sinks = collector1002sink collector1003sink stFileSink > > agent.sinkgroups = collectorGroup > > > > # Define a memory channel called myMemoryChannel > > agent.channels.myMemoryChannel.type = memory > > agent.channels.myMemoryChannel.capacity = 1000000 > > agent.channels.myMemoryChannel.transactionCapacity = 10000 > > agent.channels.myMemoryChannel.keep-alive = 30 > > > > # Define an exec source called myExecChannel to tail log file > > agent.sources.myExecSource.channels = myMemoryChannel > > agent.sources.myExecSource.type = exec > > agent.sources.myExecSource.command = tail -F /mnt/nginx/r.log > > > > # Define a custom avro sink called collector1003sink > > agent.sinks.collector1003sink.channel = myMemoryChannel > > agent.sinks.collector1003sink.type = > com.sharethis.web.flume.AvroSinkParserST2 ( > http://web.flume.AvroSinkParserST2) > > agent.sinks.collector1003sink.hostname = {PRIMARY_IP} > > agent.sinks.collector1003sink.port = 45678 > > agent.sinks.collector1003sink.batch-size = 100 > > > > # Define a custom avro sink called collector1002sink > > agent.sinks.collector1002sink.channel = myMemoryChannel > > agent.sinks.collector1002sink.type = > com.sharethis.web.flume.AvroSinkParserST2 ( > http://web.flume.AvroSinkParserST2) > > agent.sinks.collector1002sink.hostname = {BACKUP_IP} > > agent.sinks.collector1002sink.port = 45678 > > agent.sinks.collector1002sink.batch-size = 100 > > > > # logger sink called stFileSink > > agent.sinks.stFileSink.channel = myMemoryChannel > > agent.sinks.stFileSink.type = file_roll > > agent.sinks.stFileSink.sink.directory = /var/tmp/unprocessed_events/ > > agent.sinks.stFileSink.type.sink.rollInterval = 1800 > > > > # configure sinkgroup agentGroup. sinks with higher priorities are run > first > > agent.sinkgroups.collectorGroup.sinks = collector1003sink > collector1002sink stFileSink > > agent.sinkgroups.collectorGroup.processor.type = failover > > agent.sinkgroups.collectorGroup.processor.priority.collector1003sink = 10 > > agent.sinkgroups.collectorGroup.processor.priority.collector1002sink = 15 > > agent.sinkgroups.collectorGroup.processor.priority.stFileSink = 5 > > agent.sinkgroups.collectorGroup.processor.maxpenalty = 10000 > > > > > > Collector > > ------------- > > # Tell Flume which source, channel, and sink to use > > collector.channels = collectorMemoryChannel > > collector.sources = collectorAvroSource > > collector.sinks = collectorCassandraSink collectorFile > > collector.sinkgroups = collectorGroup > > > > # Define a memory channel called collectorMemoryChannel > > collector.channels.collectorMemoryChannel.type = memory > > collector.channels.collectorMemoryChannel.capacity = 10000000 > > collector.channels.collectorMemoryChannel.transactionCapacity = 100000 > > collector.channels.collectorMemoryChannel.keep-alive = 30 > > > > # Define an exec source called collectorAvroSource > > collector.sources.collectorAvroSource.channels = collectorMemoryChannel > > collector.sources.collectorAvroSource.type = avro > > collector.sources.collectorAvroSource.bind = 0.0.0.0 > > collector.sources.collectorAvroSource.port = 45678 > > > > # Define a custom sink called collectorCustomSink > > collector.sinks.collectorCassandraSink.channel = collectorMemoryChannel > > collector.sinks.collectorCassandraSink.type = > com.sharethis.web.flume.CassandraSinkST2 ( > http://web.flume.CassandraSinkST2) > > collector.sinks.collectorCassandraSink.cassandraHostname = > {CASSANDRA_HOSTNAME} > > collector.sinks.collectorCassandraSink.cassandraPort = 9160 > > > > # logger sink called collectorFile > > collector.sinks.collectorFile.channel = collectorMemoryChannel > > collector.sinks.collectorFile.type = file_roll > > collector.sinks.collectorFile.sink.directory = > /var/tmp/unprocessed_events/ > > collector.sinks.collectorFile.type.sink.rollInterval = 1800 > > > > # configure sinkgroup collectorGroup. sinks with higher priorities are > run first > > collector.sinkgroups.collectorGroup.sinks = collectorCassandraSink > collectorFile > > collector.sinkgroups.collectorGroup.processor.type = failover > > > collector.sinkgroups.collectorGroup.processor.priority.collectorCassandraSink > = 15 > > collector.sinkgroups.collectorGroup.processor.priority.collectorFile = 5 > > collector.sinkgroups.collectorGroup.processor.maxpenalty = 10000 > > > > > > On Tue, Jun 26, 2012 at 7:01 PM, Mike Percy mpercy@cloudera.com)> wrote: > > > We are able to push > 8000 events/sec (2KB per event) through a single > file channel if you put checkpoint on one disk and use 2 other disks for > data dirs. Not sure what the limit is. This is using the latest trunk code. > Other limitations may be you need to add additional sinks to your channel > to drain it faster. This is because sinks are single threaded and sources > are multithreaded. > > > > > > Mike > > > > > > > > > On Tuesday, June 26, 2012 at 6:30 PM, Juhani Connolly wrote: > > > > > > > Depending on your setup, you may find that some channels just cannot > > > > keep up with throughput. Is the timestamp on the logs gradually > falling > > > > further behind the time it appears on the collector? If so, some > > > > component(likely the channels) are turning into a bottleneck. > > > > > > > > We have about 1000 events/sec running through to our collector and > using > > > > file channel it unfortunately could not keep up. If you're not > already, > > > > try running memory channels and see how that works. > > > > > > > > On 06/27/2012 08:06 AM, Tejinder Aulakh wrote: > > > > > We are using Flume-NG to transfer logs from the log servers (40) to > > > > > collectors (4). However, now we are seeing a long delay before the > > > > > logs show up at the collectors. > > > > > > > > > > I was wondering if there is any way of running Flume-NG using > Multiple > > > > > threads so that collectors can process all the load they get form > the > > > > > log servers. Is it configurable in flume-env.sh ( > http://flume-env.sh) (http://flume-env.sh)? How many threads does > > > > > Flume-NG use by default? > > > > > > > > > > Thanks, > > > > > Tejinder > > > > > > > > > > > > > > > > > -- > > Tejinder Aulakh > > Senior Software Engineer, ShareThis > > e: tejinder@sharethis.com (mailto:tejinder@sharethis.com) > > m: 510.708.2499 > > > > (http://pinterest.com/sharethis/) > > Learn More: SQI (Social Quality Index) - A Universal Measure of Social > Quality (http://sharethis.com/sqi) > > > > > > > > > > > -- *Tejinder Aulakh *Senior Software Engineer, ShareThis *e:* tejinder@sharethis.com *m:* 510.708.2499* * ** * **Learn More:* SQI (Social Quality Index) - A Universal Measure of Social Quality [image: Facebook] [image: Twitter] [image: LinkedIn] [image: pinterest] --20cf307f36925098ce04c38c6d2a Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable Thanks Mike. Adding more sinks seems to have helped from what I'm notic= ing right now.=A0

Tejinder

On Wed, Jun 27, 2012 at 12:39 AM, Mike Percy &l= t;mpercy@cloudera.= com> wrote:
Maybe your parsing is taking some time?

Try adding multiple sinks or sink groups to the same channel on the app tie= r to get more throughput.

Regards,
Mike


On Tuesday, June 26, 2012 at 10:14 PM, Tejinder Aulakh wrote:

> Hi Mike,
>
> Below is the config for agents and collectors.
>
> CassandraSinkST2 - puts the events in Cassandra
> AvroSinkParserST2 - customized avro sink which parses the events befor= e sending.
>
> So you are saying if we simply add more sinks (CassandraSinkST2 on col= lectors) and connect them to the same channel, it would speed things up? >
> -Tejinder
>
> Agent
> -------
> #ch source, channel, and sink to use
> agent.channels =3D myMemoryChannel
> agent.sources =3D myExecSource
> agent.sinks =3D collector1002sink collector1003sink stFileSink
> agent.sinkgroups =3D collectorGroup
>
> # Define a memory channel called myMemoryChannel
> agent.channels.myMemoryChannel.type =3D memory
> agent.channels.myMemoryChannel.capacity =3D 1000000
> agent.channels.myMemoryChannel.transactionCapacity =3D 10000
> agent.channels.myMemoryChannel.keep-alive =3D 30
>
> # Define an exec source called myExecChannel to tail log file
> agent.sources.myExecSource.channels =3D myMemoryChannel
> agent.sources.myExecSource.type =3D exec
> agent.sources.myExecSource.command =3D tail -F /mnt/nginx/r.log
>
> # Define a custom avro sink called collector1003sink
> agent.sinks.collector1003sink.channel =3D myMemoryChannel
> agent.sinks.collector1003sink.type =3D com.sharethis.web.f= lume.AvroSinkParserST2 (http://web.flume.AvroSinkParserST2)
> agent.sinks.collector1003sink.hostname =3D {PRIMARY_= IP}
> agent.sinks.collector1003sink.port =3D 45678
> agent.sinks.collector1003sink.batch-size =3D 100
>
> # Define a custom avro sink called collector1002sink
> agent.sinks.collector1002sink.channel =3D myMemoryChannel
> agent.sinks.collector1002sink.type =3D com.sharethis.web.flume.A= vroSinkParserST2 (http://web.flume.AvroSinkParserST2)
> agent.sinks.collector1002sink.hostname =3D {BAC= KUP_IP}
> agent.sinks.collector1002sink.port =3D 45678
> agent.sinks.collector1002sink.batch-size =3D 100
>
> # logger sink called stFileSink
> agent.sinks.stFileSink.channel =3D myMemoryChannel
> agent.sinks.stFileSink.type =3D file_roll
> agent.sinks.stFileSink.sink.directory =3D /var/tmp/unprocessed_events/=
> agent.sinks.stFileSink.type.sink.rollInterval =3D 1800
>
> # configure sinkgroup agentGroup. sinks with higher priorities are run= first
> agent.sinkgroups.collectorGroup.sinks =3D collector1003sink collector1= 002sink stFileSink
> agent.sinkgroups.collectorGroup.processor.type =3D failover
> agent.sinkgroups.collectorGroup.processor.priority.collector1003sink = =3D 10
> agent.sinkgroups.collectorGroup.processor.priority.collector1002sink = =3D 15
> agent.sinkgroups.collectorGroup.processor.priority.stFileSink =3D 5 > agent.sinkgroups.collectorGroup.processor.maxpenalty =3D 10000
>
>
> Collector
> -------------
> # Tell Flume which source, channel, and sink to use
> collector.channels =3D collectorMemoryChannel
> collector.sources =3D collectorAvroSource
> collector.sinks =3D collectorCassandraSink collectorFile
> collector.sinkgroups =3D collectorGroup
>
> # Define a memory channel called collectorMemoryChannel
> collector.channels.collectorMemoryChannel.type =3D memory
> collector.channels.collectorMemoryChannel.capacity =3D 10000000
> collector.channels.collectorMemoryChannel.transactionCapacity =3D 1000= 00
> collector.channels.collectorMemoryChannel.keep-alive =3D 30
>
> # Define an exec source called collectorAvroSource
> collector.sources.collectorAvroSource.channels =3D collectorMemoryChan= nel
> collector.sources.collectorAvroSource.type =3D avro
> collector.sources.collectorAvroSource.bind =3D 0.0.0.0
> collector.sources.collectorAvroSource.port =3D 45678
>
> # Define a custom sink called collectorCustomSink
> collector.sinks.collectorCassandraSink.channel =3D collectorMemoryChan= nel
> collector.sinks.collectorCassandraSink.type =3D com.sharet= his.web.flume.CassandraSinkST2 (http://web.flume.CassandraSinkST2)
> collector.sinks.collectorCassandraSink.cassandraHost= name =3D {CASSANDRA_HOSTNAME}
> collector.sinks.collectorCassandraSink.cassandraPort =3D 9160
>
> # logger sink called collectorFile
> collector.sinks.collectorFile.channel =3D collectorMemoryChannel
> collector.sinks.collectorFile.type =3D file_roll
> collector.sinks.collectorFile.sink.directory =3D /var/tmp/unprocessed_= events/
> collector.sinks.collectorFile.type.sink.rollInterval =3D 1800
>
> # configure sinkgroup collectorGroup. sinks with higher priorities are= run first
> collector.sinkgroups.collectorGroup.sinks =3D collectorCassandraSink c= ollectorFile
> collector.sinkgroups.collectorGroup.processor.type =3D failover
> collector.sinkgroups.collectorGroup.processor.priority.collectorCassan= draSink =3D 15
> collector.sinkgroups.collectorGroup.processor.priority.collectorFile = =3D 5
> collector.sinkgroups.collectorGroup.processor.maxpenalty =3D 10000
>
>
> On Tue, Jun 26, 2012 at 7:01 PM, Mike Percy &l= t;mpercy@cloudera.com (mailto:mpercy@cloudera.com)> wrote: > > We are able to push > 8000 events/sec (2KB per event) through = a single file channel if you put checkpoint on one disk and use 2 other dis= ks for data dirs. Not sure what the limit is. This is using the latest trun= k code. Other limitations may be you need to add additional sinks to your c= hannel to drain it faster. This is because sinks are single threaded and so= urces are multithreaded.
> >
> > Mike
> >
> >
> > On Tuesday, June 26, 2012 at 6:30 PM, Juhani Connolly wrote:
> >
> > > Depending on your setup, you may find that some channels jus= t cannot
> > > keep up with throughput. Is the timestamp on the logs gradua= lly falling
> > > further behind the time it appears on the collector? If so, = some
> > > component(likely the channels) are turning into a bottleneck= .
> > >
> > > We have about 1000 events/sec running through to our collect= or and using
> > > file channel it unfortunately could not keep up. If you'= re not already,
> > > try running memory channels and see how that works.
> > >
> > > On 06/27/2012 08:06 AM, Tejinder Aulakh wrote:
> > > > We are using Flume-NG to transfer logs from the log ser= vers (40) to
> > > > collectors (4). However, now we are seeing a long delay= before the
> > > > logs show up at the collectors.
> > > >
> > > > I was wondering if there is any way of running Flume-NG= using Multiple
> > > > threads so that collectors can process all the load the= y get form the
> > > > log servers. Is it configurable in flume-env.sh (= http://flume-env.sh) = (http://flume-env.sh)= ? How many threads does
> > > > Flume-NG use by default?
> > > >
> > > > Thanks,
> > > > Tejinder
> > >
> >
>
>
>
>
> --
> Tejinder Aulakh
> Senior Software Engineer, ShareThis
> e: tejinder@sharethis.= com (mailto:tejinder@sharethi= s.com)
> m: 510.708.2499
>
> (
http://= pinterest.com/sharethis/)
> Learn More: SQI (Social Quality Index) - A Universal Measure of Social= Quality (http://sha= rethis.com/sqi)
>
>
>
>





--
Tejinder Aulakh
=
Senior Software Engineer, ShareThis

3D"Facebook"=A03D"Twitter"=A03D"LinkedIn"= =A0= 3D"pinterest"

--20cf307f36925098ce04c38c6d2a--