flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex <zya...@gmail.com>
Subject Re: Flume loss data when collect online data to hdfs
Date Fri, 23 Jan 2015 10:07:18 GMT
Today, I retransfer all the data online, I find the data loss again and 
it's the same as last time. So I look into the file that is suspicious. 
I find a weird character, I use a java program to parse it, and it's an 
unicode two-char surrogate pair sequence, its code point is: 0x1F4AB.

Then, I look into the source code:
1. Class: org.apache.flume.serialization.LineDeserializer
The LineDeserializer use the 
"org.apache.flume.serialization.ResettableFileInputStream#readChar" to 
read one char, when it encounters the character "0x1F4AB", it returns 
-1, and the remain file after the character are skipped.
2. Class: org.apache.flume.serialization.ResettableFileInputStream
the method 
org.apache.flume.serialization.ResettableFileInputStream#readChar snippet:
     CoderResult res = decoder.decode(buf, charBuf, isEndOfInput);
when the decoder decode the char "0x1F4AB" and the CoderResult is 
OVERFLOW, that is right because 0x1F4AB should be represented as two char.

To solve this problem, I have a solution that is to implement a line 
deserializer that use 
"org.apache.flume.serialization.ResettableFileInputStream#read()" 
instead of 
"org.apache.flume.serialization.ResettableFileInputStream#readChar". But 
I am not sure it's a good solution.

The attachment is a snippet of data with weird character at 2nd line.

Any suggestions?

Thanks,
Alex

On 1/22/2015 2:18 PM, Alex wrote:
> 1: In agent1, there is a "regex_extractor" interceptor for extracting 
> header "dt"
>
>     #interceptors
>     agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor
>     agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).*
>     agent1.sources.src_spooldir.interceptors.i1.serializers=s1
>     agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name
>     <http://agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name>=dt
>
>  in agent2, the hdfs sink use the header in the path, this is the 
> configurations:
>
>     agent2.sinks.sink1.hdfs.path = hdfs://hnd.hadoop.jsh:8020/data/%{dt}
>
> 2: I misunderstood this property, thank you for revision.
>
> Thanks,
> Alex
>
>
> On 1/22/2015 12:51 PM, Hari Shreedharan wrote:
>> 1: How do you guarantee that the data from the previous day has not 
>> spilled over to the next day? Where are you inserting the timestamp 
>> (if you are doing bucketing).
>> 2: Flume creates transactions for writes. Each batch defaults to 1000 
>> events, which are written  and flushed.  There is still only one 
>> transaction per sink, the pool size is for IO ops.
>>
>> Thanks,
>> Hari
>>
>>
>> On Wed, Jan 21, 2015 at 7:32 PM, Jay Alexander <zyacer@gmail.com 
>> <mailto:zyacer@gmail.com>> wrote:
>>
>>     First Question: No, I query the all the file in hdfs had been
>>     closed, exactly I account the data one day later.
>>
>>     Second Question: I hadn't config any about the transaction. And I
>>     saw there is an item in the hdfs sink
>>     configuration:"hdfs.threadsPoolSize10Number of threads per HDFS
>>     sink for HDFS IO ops (open, write, etc.)".
>>     So there is 10 transactions per sink from the file channel.
>>
>>     Thanks.
>>
>>
>>     2015-01-22 11:04 GMT+08:00 Hari Shreedharan
>>     <hshreedharan@cloudera.com <mailto:hshreedharan@cloudera.com>>:
>>
>>         Are you accounting for the data still being written but not
>>         yet hflushed at the time of the query? Basically one
>>         transaction per sink ?
>>
>>         Thanks,
>>         Hari
>>
>>
>>         On Wed, Jan 21, 2015 at 6:42 PM, Jay Alexander
>>         <zyacer@gmail.com <mailto:zyacer@gmail.com>> wrote:
>>
>>             I used *flume-ng 1.5* version to collect logs.
>>
>>             There are two agents in the data flow and they are on two
>>             hosts, respectively.
>>
>>             And the data is sended *from agent1 to agent2.*
>>
>>             The agents's component is as follows:
>>
>>             agent1: spooling dir source --> file channel --> avro sink
>>             agent2: avro source --> file channel --> hdfs sink
>>
>>             But it seems to loss data about 1/1000 percentage of
>>             million data.To solve problem I tried these steps:
>>
>>              1. look up agents log: cannot find any error or exception.
>>              2. look up agents monitor metrics: the events number
>>                 that put and take from channel always equals
>>              3. statistic the data number by hive query and hdfs file
>>                 use shell, respectively: the two number is equal and
>>                 less than the online data number
>>
>>
>>             These are the two agents configuration:
>>
>>                 #agent1
>>                 agent1.sources = src_spooldir
>>                 agent1.channels = chan_file
>>                 agent1.sinks = sink_avro
>>
>>                 #source
>>                 agent1.sources.src_spooldir.type = spooldir
>>                 agent1.sources.src_spooldir.spoolDir =
>>                 /data/logs/flume-spooldir
>>                 agent1.sources.src_spooldir.interceptors=i1
>>
>>                 #interceptors
>>                 agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor
>>                 agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).*
>>                 agent1.sources.src_spooldir.interceptors.i1.serializers=s1
>>                 agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name
>>                 <http://agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name>=dt
>>
>>                 #sink
>>                 agent1.sinks.sink_avro.type = avro
>>                 agent1.sinks.sink_avro.hostname = 10.235.2.212
>>                 agent1.sinks.sink_avro.port = 9910
>>
>>                 #channel
>>                 agent1.channels.chan_file.type = file
>>                 agent1.channels.chan_file.checkpointDir =
>>                 /data/flume/agent1/checkpoint
>>                 agent1.channels.chan_file.dataDirs =
>>                 /data/flume/agent1/data
>>
>>                 agent1.sources.src_spooldir.channels = chan_file
>>                 agent1.sinks.sink_avro.channel = chan_file
>>
>>
>>
>>                 # agent2
>>                 agent2.sources  = source1
>>                 agent2.channels = channel1
>>                 agent2.sinks    = sink1
>>
>>                 # source
>>                 agent2.sources.source1.type     = avro
>>                 agent2.sources.source1.bind     = 10.235.2.212
>>                 agent2.sources.source1.port     = 9910
>>
>>                 # sink
>>                 agent2.sinks.sink1.type= hdfs
>>                 agent2.sinks.sink1.hdfs.fileType = DataStream
>>                 agent2.sinks.sink1.hdfs.filePrefix = log
>>                 agent2.sinks.sink1.hdfs.path =
>>                 hdfs://hnd.hadoop.jsh:8020/data/%{dt}
>>                 agent2.sinks.sink1.hdfs.rollInterval = 600
>>                 agent2.sinks.sink1.hdfs.rollSize = 0
>>                 agent2.sinks.sink1.hdfs.rollCount = 0
>>                 agent2.sinks.sink1.hdfs.idleTimeout = 300
>>                 agent2.sinks.sink1.hdfs.round = true
>>                 agent2.sinks.sink1.hdfs.roundValue = 10
>>                 agent2.sinks.sink1.hdfs.roundUnit = minute
>>
>>                 # channel
>>                 agent2.channels.channel1.type   = file
>>                 agent2.channels.channel1.checkpointDir =
>>                 /data/flume/agent2/checkpoint
>>                 agent2.channels.channel1.dataDirs =
>>                 /data/flume/agent2/data
>>                 agent2.sinks.sink1.channel      = channel1
>>                 agent2.sources.source1.channels = channel1
>>
>>
>>             Any suggestions are welcome!
>>
>>
>>
>>
>


Mime
View raw message