flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex <zya...@gmail.com>
Subject Re: Flume loss data when collect online data to hdfs
Date Thu, 22 Jan 2015 06:18:30 GMT
1: In agent1, there is a "regex_extractor" interceptor for extracting 
header "dt"

    #interceptors
    agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor
    agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).*
    agent1.sources.src_spooldir.interceptors.i1.serializers=s1
    agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name
    <http://agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name>=dt

  in agent2, the hdfs sink use the header in the path, this is the 
configurations:

    agent2.sinks.sink1.hdfs.path = hdfs://hnd.hadoop.jsh:8020/data/%{dt}

2: I misunderstood this property, thank you for revision.

Thanks,
Alex


On 1/22/2015 12:51 PM, Hari Shreedharan wrote:
> 1: How do you guarantee that the data from the previous day has not 
> spilled over to the next day? Where are you inserting the timestamp 
> (if you are doing bucketing).
> 2: Flume creates transactions for writes. Each batch defaults to 1000 
> events, which are written  and flushed.  There is still only one 
> transaction per sink, the pool size is for IO ops.
>
> Thanks,
> Hari
>
>
> On Wed, Jan 21, 2015 at 7:32 PM, Jay Alexander <zyacer@gmail.com 
> <mailto:zyacer@gmail.com>> wrote:
>
>     First Question: No, I query the all the file in hdfs had been
>     closed, exactly I account the data one day later.
>
>     Second Question: I hadn't config any about the transaction. And I
>     saw there is an item in the hdfs sink
>     configuration:"hdfs.threadsPoolSize10Number of threads per HDFS
>     sink for HDFS IO ops (open, write, etc.)".
>     So there is 10 transactions per sink from the file channel.
>
>     Thanks.
>
>
>     2015-01-22 11:04 GMT+08:00 Hari Shreedharan
>     <hshreedharan@cloudera.com <mailto:hshreedharan@cloudera.com>>:
>
>         Are you accounting for the data still being written but not
>         yet hflushed at the time of the query? Basically one
>         transaction per sink ?
>
>         Thanks,
>         Hari
>
>
>         On Wed, Jan 21, 2015 at 6:42 PM, Jay Alexander
>         <zyacer@gmail.com <mailto:zyacer@gmail.com>> wrote:
>
>             I used *flume-ng 1.5* version to collect logs.
>
>             There are two agents in the data flow and they are on two
>             hosts, respectively.
>
>             And the data is sended *from agent1 to agent2.*
>
>             The agents's component is as follows:
>
>             agent1: spooling dir source --> file channel --> avro sink
>             agent2: avro source --> file channel --> hdfs sink
>
>             But it seems to loss data about 1/1000 percentage of
>             million data.To solve problem I tried these steps:
>
>              1. look up agents log: cannot find any error or exception.
>              2. look up agents monitor metrics: the events number that
>                 put and take from channel always equals
>              3. statistic the data number by hive query and hdfs file
>                 use shell, respectively: the two number is equal and
>                 less than the online data number
>
>
>             These are the two agents configuration:
>
>                 #agent1
>                 agent1.sources = src_spooldir
>                 agent1.channels = chan_file
>                 agent1.sinks = sink_avro
>
>                 #source
>                 agent1.sources.src_spooldir.type = spooldir
>                 agent1.sources.src_spooldir.spoolDir =
>                 /data/logs/flume-spooldir
>                 agent1.sources.src_spooldir.interceptors=i1
>
>                 #interceptors
>                 agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor
>                 agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).*
>                 agent1.sources.src_spooldir.interceptors.i1.serializers=s1
>                 agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name
>                 <http://agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name>=dt
>
>                 #sink
>                 agent1.sinks.sink_avro.type = avro
>                 agent1.sinks.sink_avro.hostname = 10.235.2.212
>                 agent1.sinks.sink_avro.port = 9910
>
>                 #channel
>                 agent1.channels.chan_file.type = file
>                 agent1.channels.chan_file.checkpointDir =
>                 /data/flume/agent1/checkpoint
>                 agent1.channels.chan_file.dataDirs =
>                 /data/flume/agent1/data
>
>                 agent1.sources.src_spooldir.channels = chan_file
>                 agent1.sinks.sink_avro.channel = chan_file
>
>
>
>                 # agent2
>                 agent2.sources  = source1
>                 agent2.channels = channel1
>                 agent2.sinks    = sink1
>
>                 # source
>                 agent2.sources.source1.type     = avro
>                 agent2.sources.source1.bind     = 10.235.2.212
>                 agent2.sources.source1.port     = 9910
>
>                 # sink
>                 agent2.sinks.sink1.type= hdfs
>                 agent2.sinks.sink1.hdfs.fileType = DataStream
>                 agent2.sinks.sink1.hdfs.filePrefix = log
>                 agent2.sinks.sink1.hdfs.path =
>                 hdfs://hnd.hadoop.jsh:8020/data/%{dt}
>                 agent2.sinks.sink1.hdfs.rollInterval = 600
>                 agent2.sinks.sink1.hdfs.rollSize = 0
>                 agent2.sinks.sink1.hdfs.rollCount = 0
>                 agent2.sinks.sink1.hdfs.idleTimeout = 300
>                 agent2.sinks.sink1.hdfs.round = true
>                 agent2.sinks.sink1.hdfs.roundValue = 10
>                 agent2.sinks.sink1.hdfs.roundUnit = minute
>
>                 # channel
>                 agent2.channels.channel1.type   = file
>                 agent2.channels.channel1.checkpointDir =
>                 /data/flume/agent2/checkpoint
>                 agent2.channels.channel1.dataDirs =
>                 /data/flume/agent2/data
>                 agent2.sinks.sink1.channel      = channel1
>                 agent2.sources.source1.channels = channel1
>
>
>             Any suggestions are welcome!
>
>
>
>


Mime
View raw message