flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tao Li <litao.bupt...@gmail.com>
Subject Re: [HDFSEventSink] Endless loop when HDFSEventSink.process() thorws exception
Date Fri, 17 Apr 2015 19:33:01 GMT
OK, Thank you very much.

2015-04-18 3:26 GMT+08:00 Gwen Shapira <gshapira@cloudera.com>:

> This looks like the right design to me.
>
> On Fri, Apr 17, 2015 at 12:22 PM, Tao Li <litao.buptsse@gmail.com> wrote:
> > Why I design like this, it's on the follwing thoughts:
> > I want to take "KafkaChanel1 => Kafka Cluster => KafkaChannel2" as one
> > channel. So it will simply be ScribeSouce put events to it, and
> > HDFSEventSink take events from it. The kafka cluster provide a stable
> > storage, and be transparent on events delivery between source and sink.
> (If
> > I use a "KafkaSource=>MemoryChannel=>HDFSEventSink" to export data from
> > kafka to hdfs, the memory isn unstable, and not transparent)
> > So the work flow is simply like this:
> > ScribeClient =>ScribeSource => KafkaChannel(distributed) =>
> HDFSEventSink =>
> > HDFS
> >
> > As Interceptor is following the source, so maybe I should add the filter
> > interceptor after ScribeSource, like this:
> > ScribeClient =>ScribeSource => FilterInterceptor =>
> > KafkaChannel(distributed) => HDFSEventSink => HDFS
> >
> > 2015-04-18 2:51 GMT+08:00 Tao Li <litao.buptsse@gmail.com>:
> >>
> >> @Gwen @Hari
> >>
> >> My use case is as follows:
> >> ScribeClient => [Agent1: ScribeSource => KafkaChannel1] => Kafka Cluster
> >> => [Agent2: KafkaCluster2 => HDFSEventSink] => HDFS
> >>
> >> The bad case is as follows:
> >> My HDFSEventSink need a header "timestamp", but some dirty data(by
> >> mistake) in Kafka doesn't has the "timestamp" headers, which cause the
> >> following BucketPath.escapeString thows NullPointerException.
> >> String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
> >> timeZone, needRounding, roundUnit, roundValue, useLocalTime);
> >>
> >> I think Gwen's second point is OK, we can add a interceptor to do the
> >> filter job.
> >>
> >> But my flume agent is kind of special:
> >> For Agent1, doesn't have sink, directly send message to kafak cluster by
> >> KafkaChannel1.
> >> For Agent2, doesn't have source, directly poll event from kafka cluster
> by
> >> KafkaChannel2.
> >> Agent1 and Agent2 is different JVM and deploy on different node.
> >>
> >> I don't know if it's reasonable for a agent with no sink or no source?
> But
> >> I have already build the whold work flow, and it's works well for me for
> >> regular cases.
> >>
> >> For Agent2, because of without source, so I can't use Gwen's Interceptor
> >> suggestion.
> >>
> >> 2015-04-18 2:30 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.com>:
> >>>
> >>> What I think he means is that a message in the channel that cannot be
> >>> serialized by the serializer because it is malformed causing the
> serializer
> >>> to fail and perhaps throw (think malformed Avro). Such a message
> basically
> >>> would be stuck in an infinite loop. So the workaround in (2) would
> work if
> >>> using a Kafka Source.
> >>>
> >>> Thanks,
> >>> Hari
> >>>
> >>>
> >>> On Fri, Apr 17, 2015 at 10:08 AM, Tao Li <litao.buptsse@gmail.com>
> wrote:
> >>>>
> >>>> OK, I got it, Thanks.
> >>>>
> >>>> 2015-04-18 0:59 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.com
> >:
> >>>>>
> >>>>> Are you using Kafka channel? The fix I mentioned was for file
> channel.
> >>>>> Unfortunately, we don't plan to introduce something that drops data
> in real
> >>>>> time. This makes it too easy for a misconfig to cause data loss.
> You'd have
> >>>>> to ensure the data in the Kafka channel is valid.
> >>>>>
> >>>>> Thanks,
> >>>>> Hari
> >>>>>
> >>>>>
> >>>>> On Fri, Apr 17, 2015 at 9:41 AM, Tao Li <litao.buptsse@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>> @Hari, you mean I need to ensure the data in kafka is OK by
myself,
> >>>>>> right?
> >>>>>>
> >>>>>> How about we have a config to let user decide how to handle
BACKOFF.
> >>>>>> For example, we can config the max retry num in process(), and
also
> >>>>>> config wether commit or not when exceed the max retry num.(In
my
> kafka case,
> >>>>>> when meet dirty data, commit the comsume offset will be nice
for me
> than
> >>>>>> endless loop)
> >>>>>>
> >>>>>> 2015-04-18 0:23 GMT+08:00 Hari Shreedharan
> >>>>>> <hshreedharan@cloudera.com>:
> >>>>>>>
> >>>>>>> We recently added functionality to the file channel integrity
tool
> >>>>>>> that can be used to remove bad events from the channel -
though
> you would
> >>>>>>> need to write some code to validate events. It will be in
the soon
> to be
> >>>>>>> released 1.6.0
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Hari
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, Apr 17, 2015 at 9:05 AM, Tao Li <litao.buptsse@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi all:
> >>>>>>>>
> >>>>>>>> My use case is KafkaChannel + HDFSEventSink.
> >>>>>>>>
> >>>>>>>> I found that SinkRunner.PollingRunner will call
> >>>>>>>> HDFSEventSink.process() in a while loop. For example,
a message
> in kafka
> >>>>>>>> contains dirty data, so HDFSEventSink.process() consume
message
> from kafka,
> >>>>>>>> throws exception because of dirty data, and kafka offset
doesn't
> commit. And
> >>>>>>>> the outer loop, will continue call HDFSEventSink.process().
> Because the
> >>>>>>>> kafka offset doesn't change, so HDFSEventSink will consume
the
> dirty data
> >>>>>>>> again. The bad loop is never stopped.
> >>>>>>>>
> >>>>>>>> I want to know that if we have a mechanism to cover
this case? For
> >>>>>>>> example, we have a max retry num for a unique
> HDFSEventSink.process() call
> >>>>>>>> and give up when exceed max limit.
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Mime
View raw message