Return-Path: X-Original-To: apmail-flume-user-archive@www.apache.org Delivered-To: apmail-flume-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 31AB417943 for ; Fri, 17 Apr 2015 18:51:56 +0000 (UTC) Received: (qmail 35018 invoked by uid 500); 17 Apr 2015 18:51:55 -0000 Delivered-To: apmail-flume-user-archive@flume.apache.org Received: (qmail 34958 invoked by uid 500); 17 Apr 2015 18:51:55 -0000 Mailing-List: contact user-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flume.apache.org Delivered-To: mailing list user@flume.apache.org Received: (qmail 34948 invoked by uid 99); 17 Apr 2015 18:51:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Apr 2015 18:51:55 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of litao.buptsse@gmail.com designates 209.85.192.48 as permitted sender) Received: from [209.85.192.48] (HELO mail-qg0-f48.google.com) (209.85.192.48) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Apr 2015 18:51:51 +0000 Received: by qgfi89 with SMTP id i89so26046900qgf.1 for ; Fri, 17 Apr 2015 11:51:30 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=IXIO7jjyjfWXFapUfy1fkxibuRxfPAXhmcsEU6MKNvY=; b=RGZflRGq/e8MhyK8i/Afu61tp6cY2ntFFduXhfWTtsQaPFe+jXpF+7PdMLYwZkdbRK 6liRjvic0e5JTbwG/OFcsb+fLzgPCezNYqp/Xk8KUjfc1U0TOjoVym83RR3MMl2LoSiv pXjU7k1MRNGEs/Qqi5AYTfhN0643mlovU7NAfRfaGvROjPh0XBP0p6Lugk5wX0PzJBwn wH6A3ZY/T/eYtucoBV9PNJkRyIqxkIx1Wy4bYOJaRpxzTRSBxFTOx+md4tP14YXYj4X8 bKZth+InxlJRSd+AHoGB24SeUiACJNba4dzwz15ZjyclGZ9igFiFc9yvJF4CPMBhZvl8 6p+A== MIME-Version: 1.0 X-Received: by 10.55.53.137 with SMTP id c131mr8379423qka.102.1429296690695; Fri, 17 Apr 2015 11:51:30 -0700 (PDT) Received: by 10.140.98.66 with HTTP; Fri, 17 Apr 2015 11:51:30 -0700 (PDT) In-Reply-To: <1429295453207.e601c503@Nodemailer> References: <1429295453207.e601c503@Nodemailer> Date: Sat, 18 Apr 2015 02:51:30 +0800 Message-ID: Subject: Re: [HDFSEventSink] Endless loop when HDFSEventSink.process() thorws exception From: Tao Li To: user@flume.apache.org Content-Type: multipart/alternative; boundary=001a11476f568712dc0513f011ef X-Virus-Checked: Checked by ClamAV on apache.org --001a11476f568712dc0513f011ef Content-Type: text/plain; charset=UTF-8 @Gwen @Hari My use case is as follows: ScribeClient => [Agent1: ScribeSource => KafkaChannel1] => Kafka Cluster => [Agent2: KafkaCluster2 => HDFSEventSink] => HDFS The bad case is as follows: My HDFSEventSink need a header "*timestamp*", but some dirty data(by mistake) in Kafka doesn't has the "timestamp" headers, which cause the following BucketPath.escapeString thows *NullPointerException*. String realPath = BucketPath.escapeString(filePath, event.getHeaders(), timeZone, needRounding, roundUnit, roundValue, useLocalTime); *I think Gwen's second point is OK, we can add a interceptor to do the filter job.* But my flume agent is kind of special: For Agent1, doesn't have sink, directly send message to kafak cluster by KafkaChannel1. For Agent2, doesn't have source, directly poll event from kafka cluster by KafkaChannel2. Agent1 and Agent2 is different JVM and deploy on different node. *I don't know if it's reasonable for a agent with no sink or no source?* But I have already build the whold work flow, and it's works well for me for regular cases. *For Agent2, because of without source, so I can't use Gwen's Interceptor suggestion.* 2015-04-18 2:30 GMT+08:00 Hari Shreedharan : > What I think he means is that a message in the channel that cannot be > serialized by the serializer because it is malformed causing the serializer > to fail and perhaps throw (think malformed Avro). Such a message basically > would be stuck in an infinite loop. So the workaround in (2) would work if > using a Kafka Source. > > Thanks, > Hari > > > On Fri, Apr 17, 2015 at 10:08 AM, Tao Li wrote: > >> OK, I got it, Thanks. >> >> 2015-04-18 0:59 GMT+08:00 Hari Shreedharan : >> >>> Are you using Kafka channel? The fix I mentioned was for file channel. >>> Unfortunately, we don't plan to introduce something that drops data in real >>> time. This makes it too easy for a misconfig to cause data loss. You'd have >>> to ensure the data in the Kafka channel is valid. >>> >>> Thanks, >>> Hari >>> >>> >>> On Fri, Apr 17, 2015 at 9:41 AM, Tao Li wrote: >>> >>>> @Hari, you mean I need to ensure the data in kafka is OK by myself, >>>> right? >>>> >>>> How about we have a config to let user decide how to handle BACKOFF. >>>> For example, we can config the max retry num in process(), and also >>>> config wether commit or not when exceed the max retry num.(In my kafka >>>> case, when meet dirty data, commit the comsume offset will be nice for me >>>> than endless loop) >>>> >>>> 2015-04-18 0:23 GMT+08:00 Hari Shreedharan : >>>> >>>>> We recently added functionality to the file channel integrity tool >>>>> that can be used to remove bad events from the channel - though you would >>>>> need to write some code to validate events. It will be in the soon to be >>>>> released 1.6.0 >>>>> >>>>> Thanks, >>>>> Hari >>>>> >>>>> >>>>> On Fri, Apr 17, 2015 at 9:05 AM, Tao Li >>>>> wrote: >>>>> >>>>>> Hi all: >>>>>> >>>>>> My use case is KafkaChannel + HDFSEventSink. >>>>>> >>>>>> I found that SinkRunner.PollingRunner will call >>>>>> HDFSEventSink.process() in a while loop. For example, a message in kafka >>>>>> contains dirty data, so HDFSEventSink.process() consume message from kafka, >>>>>> throws exception because of *dirty data*, and *kafka offset doesn't >>>>>> commit*. And the outer loop, will continue call >>>>>> HDFSEventSink.process(). Because the kafka offset doesn't change, so >>>>>> HDFSEventSink will consume the dirty data *again*. The bad loop is *never >>>>>> stopped*. >>>>>> >>>>>> *I want to know that if we have a **mechanism to cover this case?* >>>>>> For example, we have a max retry num for a unique HDFSEventSink.process() >>>>>> call and give up when exceed max limit. >>>>>> >>>>>> >>>>> >>>> >>> >> > --001a11476f568712dc0513f011ef Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
@Gwen @Hari

My use case is as follows:<= /div>
ScribeClient =3D> [Agent1: ScribeSource =3D> KafkaChannel1]= =3D> Kafka Cluster =3D> [Agent2: KafkaCluster2 =3D> HDFSEventSink= ] =3D> HDFS

The bad case is as follows:
My HDFSEventSink need a header "timestamp", but som= e dirty data(by mistake) in Kafka doesn't has the "timestamp"= headers, which cause the following BucketPath.escapeString thows NullPo= interException.
String realPath =3D BucketPath.escapeString(f= ilePath, event.getHeaders(), timeZone, needRounding, roundUnit, roundValue,= useLocalTime);

I think Gwen's second p= oint is OK, we can add a interceptor to do the filter job.
But my flume agent is kind of special:
For Agent1, d= oesn't have sink, directly send message to kafak cluster by KafkaChanne= l1.=C2=A0
For Agent2, doesn't have source, directly poll even= t from kafka cluster by KafkaChannel2.
Agent1 and Agent2 is diffe= rent JVM and deploy on different node.

I don= 9;t know if it's=C2=A0reasonable for a agent with no sink or no source?= =C2=A0But I have already build the whold work flow, and it's works = well for me for regular cases.

For Agent2, beca= use of without source, so I can't use Gwen's Interceptor suggestion= .

= 2015-04-18 2:30 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.= com>:
What I think he means is that a message in the channel that cannot be= serialized by the serializer because it is malformed causing the serialize= r to fail and perhaps throw (think malformed Avro). Such a message basicall= y would be stuck in an infinite loop. So the workaround in (2) would work i= f using a Kafka Source.=C2=A0

Thanks,=C2=A0
Hari


On Fri, Apr 17, 2015 at 10:08 AM, Tao= Li <litao.buptsse@gmail.com> wrote:

OK, I got it, Thanks.

2015-04-18 0:59 GMT+08:00 Hari Shreedharan <= span dir=3D"ltr"><hshreedharan@cloudera.com>:
Are you using Kafka channel? The fix I mentioned was for file channel.= Unfortunately, we don't plan to introduce something that drops data in= real time. This makes it too easy for a misconfig to cause data loss. You&= #39;d have to ensure the data in the Kafka channel is valid.

Thanks,=C2=A0
Hari


On Fri, Apr 17, 2015 at 9:41 AM, Tao Li <litao.buptsse@gmail.com<= /a>> wrote:

@Hari, you mean I need to ensure the data in ka= fka is OK by myself, right?

How about we have a = config to let user decide how to handle BACKOFF.
For example, we can confi= g the max retry num in=C2=A0process(= ), and also config wether commit or not when exceed the max retry num.(In m= y kafka case, when meet dirty data, commit the comsume offset will be nice = for me than endless loop)




--001a11476f568712dc0513f011ef--