flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tao Li <litao.bupt...@gmail.com>
Subject Re: [HDFSEventSink] Endless loop when HDFSEventSink.process() thorws exception
Date Fri, 17 Apr 2015 18:51:30 GMT
@Gwen @Hari

My use case is as follows:
ScribeClient => [Agent1: ScribeSource => KafkaChannel1] => Kafka Cluster =>
[Agent2: KafkaCluster2 => HDFSEventSink] => HDFS

The bad case is as follows:
My HDFSEventSink need a header "*timestamp*", but some dirty data(by
mistake) in Kafka doesn't has the "timestamp" headers, which cause the
following BucketPath.escapeString thows *NullPointerException*.
String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);

*I think Gwen's second point is OK, we can add a interceptor to do the
filter job.*

But my flume agent is kind of special:
For Agent1, doesn't have sink, directly send message to kafak cluster by
For Agent2, doesn't have source, directly poll event from kafka cluster by
Agent1 and Agent2 is different JVM and deploy on different node.

*I don't know if it's reasonable for a agent with no sink or no source?* But
I have already build the whold work flow, and it's works well for me for
regular cases.

*For Agent2, because of without source, so I can't use Gwen's Interceptor

2015-04-18 2:30 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.com>:

> What I think he means is that a message in the channel that cannot be
> serialized by the serializer because it is malformed causing the serializer
> to fail and perhaps throw (think malformed Avro). Such a message basically
> would be stuck in an infinite loop. So the workaround in (2) would work if
> using a Kafka Source.
> Thanks,
> Hari
> On Fri, Apr 17, 2015 at 10:08 AM, Tao Li <litao.buptsse@gmail.com> wrote:
>> OK, I got it, Thanks.
>> 2015-04-18 0:59 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.com>:
>>> Are you using Kafka channel? The fix I mentioned was for file channel.
>>> Unfortunately, we don't plan to introduce something that drops data in real
>>> time. This makes it too easy for a misconfig to cause data loss. You'd have
>>> to ensure the data in the Kafka channel is valid.
>>> Thanks,
>>> Hari
>>> On Fri, Apr 17, 2015 at 9:41 AM, Tao Li <litao.buptsse@gmail.com> wrote:
>>>> @Hari, you mean I need to ensure the data in kafka is OK by myself,
>>>> right?
>>>> How about we have a config to let user decide how to handle BACKOFF.
>>>>  For example, we can config the max retry num in process(), and also
>>>> config wether commit or not when exceed the max retry num.(In my kafka
>>>> case, when meet dirty data, commit the comsume offset will be nice for me
>>>> than endless loop)
>>>> 2015-04-18 0:23 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.com>:
>>>>> We recently added functionality to the file channel integrity tool
>>>>> that can be used to remove bad events from the channel - though you would
>>>>> need to write some code to validate events. It will be in the soon to
>>>>> released 1.6.0
>>>>> Thanks,
>>>>> Hari
>>>>> On Fri, Apr 17, 2015 at 9:05 AM, Tao Li <litao.buptsse@gmail.com>
>>>>> wrote:
>>>>>> Hi all:
>>>>>> My use case is KafkaChannel + HDFSEventSink.
>>>>>> I found that SinkRunner.PollingRunner will call
>>>>>> HDFSEventSink.process() in a while loop. For example, a message in
>>>>>> contains dirty data, so HDFSEventSink.process() consume message from
>>>>>> throws exception because of *dirty data*, and *kafka offset doesn't
>>>>>> commit*. And the outer loop, will continue call
>>>>>> HDFSEventSink.process(). Because the kafka offset doesn't change,
>>>>>> HDFSEventSink will consume the dirty data *again*. The bad loop is
>>>>>> stopped*.
>>>>>>  *I want to know that if we have a **mechanism to cover this case?*
>>>>>> For example, we have a max retry num for a unique HDFSEventSink.process()
>>>>>> call and give up when exceed max limit.

View raw message