flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Israel Ekpo <isr...@aicer.org>
Subject Re: Log Events get Lost - flume 1.3
Date Tue, 16 Apr 2013 19:02:59 GMT
Hello

You can append or prefix a unique value to the nano time since you want
each key to be unique per batch.

Here is the first approach:

private static AtomicLong idCounter = new AtomicLong();

public static String createSequenceID()
{
    return String.valueOf(idCounter.getAndIncrement());
}


You can also use the UUID random number generator to get the unique values

String uniqueID = UUID.randomUUID().toString();

http://docs.oracle.com/javase/6/docs/api/java/util/UUID.html#randomUUID()

I prefer the first option since it is more readable and helpful especially
when you are debugging issues.

I hope this helps.

On 16 April 2013 14:36, Kumar, Deepak8 <deepak8.kumar@citi.com> wrote:

>  Hi Brock,****
>
> Thanks for assisting.****
>
> ** **
>
> Actually we have an interceptor implementation through which we are
> generating our row key for hbase (hbase is sink). If we have larger batch
> size then the chances are that the timestamp is getting repeated in rowkey
> which would overwrite the rows in hbase.****
>
> ** **
>
> Could you please guide me if we do have any work around so that I can have
> larger batchsize as well as the row key is not repeated. I am taking the
> count till nano timestamp.****
>
> ** **
>
> Regards,****
>
> Deepak****
>
> ** **
>
> @Override****
>
>   *public* Event intercept(Event event) {****
>
> //      eventCounter++;****
>
>     //*env*, logType, appId, logPath and logFileName ****
>
>     Map<String, String> headers = event.getHeaders(); ****
>
>     *long* now = System.*currentTimeMillis*();****
>
>     String nowNano = Long.*toString*(System.*nanoTime*());****
>
>     //nowNano = nowNano.substring(nowNano.length()-5);****
>
>        ****
>
>     headers.put(*TIMESTAMP*, Long.*toString*(now));****
>
>     headers.put(*HOST_NAME*, hostName);****
>
>     headers.put(*ENV*, env);****
>
>     headers.put(*LOG_TYPE*, logType);****
>
>     headers.put(*APP_ID*, appId);****
>
>     headers.put(*LOG_FILE_PATH*, logFilePath);****
>
>     headers.put(*LOG_FILE_NAME*, logFileName);****
>
>     headers.put(*TIME_STAMP_NANO*, nowNano);****
>
>     ****
>
>     *return* event;****
>
>   }****
>
> ** **
>
> @Override****
>
>   *public* List<Event> intercept(List<Event> events) {****
>
>     *for* (Event event : events) {****
>
>       intercept(event);****
>
>     }****
>
>     *return* events;****
>
>   }****
>
> ** **
>
> ** **
>
> *From:* Brock Noland [mailto:brock@cloudera.com]
> *Sent:* Tuesday, April 16, 2013 10:39 AM
> *To:* user@flume.apache.org
> *Subject:* Re: Log Events get Lost - flume 1.3****
>
> ** **
>
> Hi,****
>
> ** **
>
> There are two issues with your configuration:****
>
> ** **
>
> 1) batch size of 1 with file channel is anti-pattern. This will result in
> extremely poor performance because the file channel will have to do an
> fsync() (expensive disk operation required to ensure no data loss) for each
> event. Your batch size should probably be in the hundreds or thousands.***
> *
>
> ** **
>
> 2) tail -F *will* lose data. There is a writeup on this in documentation.
> If you care about your data, you will want to use Spooling Directory Source.
> ****
>
> ** **
>
> Issue #2 is being worsened by issue #1. Since you have such a low batch
> size, throughput of the file channel is extremely low. As tail -F results
> in no feedback to the tail process, more data than is being lost than would
> otherwise be the case due to the low channel throughput.****
>
> ** **
>
> ** **
>
> Brock****
>
> ** **
>
> On Tue, Apr 16, 2013 at 3:16 AM, Kumar, Deepak8 <deepak8.kumar@citi.com>
> wrote:****
>
> Hi,****
>
> I have 10 flume agents configured at a single machine. A single log file
> has frequency of 500 log events/sec. Hence in 10 log files the logs are
> getting generated as 5000 log events per second (5000/sec).****
>
>  ****
>
> If my channel capacity is 1 million,  more than 70% of log events is lost!
> If I increase the channel capacity to 50 millions, then flume agent takes
> more than 24 hours to transfer the log events from source to sink.****
>
>  ****
>
> The size of dataDir (agent.channels.fileChannel.dataDirs =
> /var/log/flume-ng/file-channel/data) is almost 2G all the time.****
>
>  ****
>
> Could you please guide me the optimum configuration so that I don't miss
> any of log events & the transfer is also good enough. My
> flume-conf.properties has following contents:****
>
> * *****
>
> * *****
>
> * *****
>
> *agent.channels = fileChannel*****
>
> *agent.sinks = avroSink*****
>
> * *****
>
> *# Each sink's type must be defined*****
>
> *agent.sinks.avroSink.type = avro*****
>
> *agent.sinks.avroSink.hostname = spnnq01.nam.nsroot.net*****
>
> *agent.sinks.avroSink.port = 1442*****
>
> *agent.sinks.avroSink.batchSize = 1000*****
>
> * *****
>
> *#Specify the channel the sink should use*****
>
> *agent.sinks.avroSink.channel = fileChannel*****
>
> * *****
>
> * *****
>
> *# Each channel's type is defined.*****
>
> *agent.channels.fileChannel.type = file*****
>
> *agent.channels.fileChannel.checkpointDir =
> /var/log/flume-ng/file-channel/checkpoint*****
>
> *agent.channels.fileChannel.dataDirs = /var/log/flume-ng/file-channel/data
> *****
>
> *agent.channels.fileChannel.transactionCapacity = 1000*****
>
> *agent.channels.fileChannel.checkpointInterval = 30000*****
>
> *agent.channels.fileChannel.maxFileSize = 2146435071*****
>
> *agent.channels.fileChannel.minimumRequiredSpace = 524288000*****
>
> *agent.channels.fileChannel.keep-alive = 5*****
>
> *agent.channels.fileChannel.write-timeout = 10*****
>
> *agent.channels.fileChannel.checkpoint-timeout = 600*****
>
> *agent.channels.fileChannel.capacity = 50000000*****
>
> *agent.sources.s2.batchSize = 1*****
>
> *agent.sources.s2.channels = fileChannel*****
>
> *agent.sources.s2.command = tail -F
> /var/log/creditcard/AggKeyListener.2.2013-01-19*****
>
> *agent.sources.s2.interceptors = logIntercept*****
>
> *agent.sources.s2.interceptors.logIntercept.appId = 153299*****
>
> *agent.sources.s2.interceptors.logIntercept.env = SP*****
>
> *agent.sources.s2.interceptors.logIntercept.hostName =
> vm-e61b-fe34.nam.nsroot.net*****
>
> *agent.sources.s2.interceptors.logIntercept.logFileName =
> AggKeyListener.2.2013-01-19*****
>
> *agent.sources.s2.interceptors.logIntercept.logFilePath =
> /var/log/creditcard/*****
>
> *agent.sources.s2.interceptors.logIntercept.logType = creditcard log*****
>
> *agent.sources.s2.interceptors.logIntercept.type =
> com.citi.sponge.flume.agent.source.LogInterceptor$Builder*****
>
> *agent.sources.s2.type = exec*****
>
> *agent.sources.s0.batchSize = 1*****
>
> *agent.sources.s0.channels = fileChannel*****
>
> *agent.sources.s0.command = tail -F
> /var/log/creditcard/AggKeyListener.0.2013-01-19*****
>
> *agent.sources.s0.interceptors = logIntercept*****
>
> *agent.sources.s0.interceptors.logIntercept.appId = 153299*****
>
> *agent.sources.s0.interceptors.logIntercept.env = SP*****
>
> *agent.sources.s0.interceptors.logIntercept.hostName =
> vm-e61b-fe34.nam.nsroot.net*****
>
> *agent.sources.s0.interceptors.logIntercept.logFileName =
> AggKeyListener.0.2013-01-19*****
>
> *agent.sources.s0.interceptors.logIntercept.logFilePath =
> /var/log/creditcard/*****
>
> *agent.sources.s0.interceptors.logIntercept.logType = creditcard log*****
>
> *agent.sources.s0.interceptors.logIntercept.type =
> com.citi.sponge.flume.agent.source.LogInterceptor$Builder*****
>
> *agent.sources.s0.type = exec*****
>
> *agent.sources.s1.batchSize = 1*****
>
> *agent.sources.s1.channels = fileChannel*****
>
> *agent.sources.s1.command = tail -F
> /var/log/creditcard/AggKeyListener.1.2013-01-19*****
>
> *agent.sources.s1.interceptors = logIntercept*****
>
> *agent.sources.s1.interceptors.logIntercept.appId = 153299*****
>
> *agent.sources.s1.interceptors.logIntercept.env = SP*****
>
> *agent.sources.s1.interceptors.logIntercept.hostName =
> vm-e61b-fe34.nam.nsroot.net*****
>
> *agent.sources.s1.interceptors.logIntercept.logFileName =
> AggKeyListener.1.2013-01-19*****
>
> *agent.sources.s1.interceptors.logIntercept.logFilePath =
> /var/log/creditcard/*****
>
> *agent.sources.s1.interceptors.logIntercept.logType = creditcard log*****
>
> *agent.sources.s1.interceptors.logIntercept.type =
> com.citi.sponge.flume.agent.source.LogInterceptor$Builder*****
>
> *agent.sources.s1.type = exec*****
>
> *agent.sources.s3.batchSize = 1*****
>
> *agent.sources.s3.channels = fileChannel*****
>
> *agent.sources.s3.command = tail -F
> /var/log/creditcard/AggKeyListener.3.2013-01-19*****
>
> *agent.sources.s3.interceptors = logIntercept*****
>
> *agent.sources.s3.interceptors.logIntercept.appId = 153299*****
>
> *agent.sources.s3.interceptors.logIntercept.env = SP*****
>
> *agent.sources.s3.interceptors.logIntercept.hostName =
> vm-e61b-fe34.nam.nsroot.net*****
>
> *agent.sources.s3.interceptors.logIntercept.logFileName =
> AggKeyListener.3.2013-01-19*****
>
> *agent.sources.s3.interceptors.logIntercept.logFilePath =
> /var/log/creditcard/*****
>
> *agent.sources.s3.interceptors.logIntercept.logType = creditcard log*****
>
> *agent.sources.s3.interceptors.logIntercept.type =
> com.citi.sponge.flume.agent.source.LogInterceptor$Builder*****
>
> *agent.sources.s3.type = exec*****
>
> *agent.sources.s4.batchSize = 1*****
>
> *agent.sources.s4.channels = fileChannel*****
>
> *agent.sources.s4.command = tail -F
> /var/log/creditcard/AggKeyListener.4.2013-01-19*****
>
> *agent.sources.s4.interceptors = logIntercept*****
>
> *agent.sources.s4.interceptors.logIntercept.appId = 153299*****
>
> *agent.sources.s4.interceptors.logIntercept.env = SP*****
>
> *agent.sources.s4.interceptors.logIntercept.hostName =
> vm-e61b-fe34.nam.nsroot.net*****
>
> *agent.sources.s4.interceptors.logIntercept.logFileName =
> AggKeyListener.4.2013-01-19*****
>
> *agent.sources.s4.interceptors.logIntercept.logFilePath =
> /var/log/creditcard/*****
>
> *agent.sources.s4.interceptors.logIntercept.logType = creditcard log*****
>
> *agent.sources.s4.interceptors.logIntercept.type =
> com.citi.sponge.flume.agent.source.LogInterceptor$Builder*****
>
> *agent.sources.s4.type = exec*****
>
> *agent.sources.s5.batchSize = 1*****
>
> *agent.sources.s5.channels = fileChannel*****
>
> *agent.sources.s5.command = tail -F
> /var/log/creditcard/AggKeyListener.5.2013-01-19*****
>
> *agent.sources.s5.interceptors = logIntercept*****
>
> *agent.sources.s5.interceptors.logIntercept.appId = 153299*****
>
> *agent.sources.s5.interceptors.logIntercept.env = SP*****
>
> *agent.sources.s5.interceptors.logIntercept.hostName =
> vm-e61b-fe34.nam.nsroot.net*****
>
> *agent.sources.s5.interceptors.logIntercept.logFileName =
> AggKeyListener.5.2013-01-19*****
>
> *agent.sources.s5.interceptors.logIntercept.logFilePath =
> /var/log/creditcard/*****
>
> *agent.sources.s5.interceptors.logIntercept.logType = creditcard log*****
>
> *agent.sources.s5.interceptors.logIntercept.type =
> com.citi.sponge.flume.agent.source.LogInterceptor$Builder*****
>
> *agent.sources.s5.type = exec*****
>
> *agent.sources.s6.batchSize = 1*****
>
> *agent.sources.s6.channels = fileChannel*****
>
> *agent.sources.s6.command = tail -F
> /var/log/creditcard/AggKeyListener.6.2013-01-19*****
>
> *agent.sources.s6.interceptors = logIntercept*****
>
> *agent.sources.s6.interceptors.logIntercept.appId = 153299*****
>
> *agent.sources.s6.interceptors.logIntercept.env = SP*****
>
> *agent.sources.s6.interceptors.logIntercept.hostName =
> vm-e61b-fe34.nam.nsroot.net*****
>
> *agent.sources.s6.interceptors.logIntercept.logFileName =
> AggKeyListener.6.2013-01-19*****
>
> *agent.sources.s6.interceptors.logIntercept.logFilePath =
> /var/log/creditcard/*****
>
> *agent.sources.s6.interceptors.logIntercept.logType = creditcard log*****
>
> *agent.sources.s6.interceptors.logIntercept.type =
> com.citi.sponge.flume.agent.source.LogInterceptor$Builder*****
>
> *agent.sources.s6.type = exec*****
>
> *agent.sources.s7.batchSize = 1*****
>
> *agent.sources.s7.channels = fileChannel*****
>
> *agent.sources.s7.command = tail -F
> /var/log/creditcard/AggKeyListener.7.2013-01-19*****
>
> *agent.sources.s7.interceptors = logIntercept*****
>
> *agent.sources.s7.interceptors.logIntercept.appId = 153299*****
>
> *agent.sources.s7.interceptors.logIntercept.env = SP*****
>
> *agent.sources.s7.interceptors.logIntercept.hostName =
> vm-e61b-fe34.nam.nsroot.net*****
>
> *agent.sources.s7.interceptors.logIntercept.logFileName =
> AggKeyListener.7.2013-01-19*****
>
> *agent.sources.s7.interceptors.logIntercept.logFilePath =
> /var/log/creditcard/*****
>
> *agent.sources.s7.interceptors.logIntercept.logType = creditcard log*****
>
> *agent.sources.s7.interceptors.logIntercept.type =
> com.citi.sponge.flume.agent.source.LogInterceptor$Builder*****
>
> *agent.sources.s7.type = exec*****
>
> *agent.sources.s8.batchSize = 1*****
>
> *agent.sources.s8.channels = fileChannel*****
>
> *agent.sources.s8.command = tail -F
> /var/log/creditcard/AggKeyListener.8.2013-01-19*****
>
> *agent.sources.s8.interceptors = logIntercept*****
>
> *agent.sources.s8.interceptors.logIntercept.appId = 153299*****
>
> *agent.sources.s8.interceptors.logIntercept.env = SP*****
>
> *agent.sources.s8.interceptors.logIntercept.hostName =
> vm-e61b-fe34.nam.nsroot.net*****
>
> *agent.sources.s8.interceptors.logIntercept.logFileName =
> AggKeyListener.8.2013-01-19*****
>
> *agent.sources.s8.interceptors.logIntercept.logFilePath =
> /var/log/creditcard/*****
>
> *agent.sources.s8.interceptors.logIntercept.logType = creditcard log*****
>
> *agent.sources.s8.interceptors.logIntercept.type =
> com.citi.sponge.flume.agent.source.LogInterceptor$Builder*****
>
> *agent.sources.s8.type = exec*****
>
> *agent.sources.s9.batchSize = 1*****
>
> *agent.sources.s9.channels = fileChannel*****
>
> *agent.sources.s9.command = tail -F
> /var/log/creditcard/AggKeyListener.9.2013-01-19*****
>
> *agent.sources.s9.interceptors = logIntercept*****
>
> *agent.sources.s9.interceptors.logIntercept.appId = 153299*****
>
> *agent.sources.s9.interceptors.logIntercept.env = SP*****
>
> *agent.sources.s9.interceptors.logIntercept.hostName =
> vm-e61b-fe34.nam.nsroot.net*****
>
> *agent.sources.s9.interceptors.logIntercept.logFileName =
> AggKeyListener.9.2013-01-19*****
>
> *agent.sources.s9.interceptors.logIntercept.logFilePath =
> /var/log/creditcard/*****
>
> *agent.sources.s9.interceptors.logIntercept.logType = creditcard log*****
>
> *agent.sources.s9.interceptors.logIntercept.type =
> com.citi.sponge.flume.agent.source.LogInterceptor$Builder*****
>
> *agent.sources.s9.type = exec*****
>
>  ****
>
> Regards,****
>
> Deepak****
>
>
>
> ****
>
> ** **
>
> -- ****
>
> Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org****
>

Mime
View raw message