Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EF2EE18B64 for ; Wed, 20 Jan 2016 11:00:05 +0000 (UTC) Received: (qmail 58142 invoked by uid 500); 20 Jan 2016 11:00:05 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 58053 invoked by uid 500); 20 Jan 2016 11:00:05 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 58044 invoked by uid 99); 20 Jan 2016 11:00:05 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jan 2016 11:00:05 +0000 Received: from mail-lf0-f52.google.com (mail-lf0-f52.google.com [209.85.215.52]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id D3E851A0056 for ; Wed, 20 Jan 2016 11:00:04 +0000 (UTC) Received: by mail-lf0-f52.google.com with SMTP id h129so3402499lfh.3 for ; Wed, 20 Jan 2016 03:00:04 -0800 (PST) X-Gm-Message-State: ALoCoQn4YmVYWmpoggqOu0vFle2fA5ttWuZJo3GoyzSUqccy2aeH3B5GuSNrLsrpJIa3sRW61yE0Lzr85QUGv90NUJuIuATcNA== X-Received: by 10.25.211.70 with SMTP id k67mr13004013lfg.119.1453287603298; Wed, 20 Jan 2016 03:00:03 -0800 (PST) MIME-Version: 1.0 Received: by 10.112.190.67 with HTTP; Wed, 20 Jan 2016 02:59:43 -0800 (PST) In-Reply-To: References: From: Robert Metzger Date: Wed, 20 Jan 2016 11:59:43 +0100 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Frequent exceptions killing streaming job To: "user@flink.apache.org" Content-Type: multipart/alternative; boundary=001a114205ba59eb5e0529c1e373 --001a114205ba59eb5e0529c1e373 Content-Type: text/plain; charset=UTF-8 Hey Nick, I had a discussion with Stephan Ewen on how we could resolve the issue. I filed a JIRA with our suggested approach: https://issues.apache.org/jira/browse/FLINK-3264 By handling this directly in the KafkaConsumer, we would avoid fetching data we can not handle anyways (discarding in the deserialization schema would be more inefficient). Let us know what you think about our suggested approach. Sadly, it seems that the Kafka 0.9 consumer API does not yet support requesting the latest offset of a TopicPartition. I'll ask about this on their ML. On Sun, Jan 17, 2016 at 8:28 PM, Nick Dimiduk wrote: > On Sunday, January 17, 2016, Stephan Ewen wrote: > >> I agree, real time streams should never go down. >> > > Glad to hear that :) > > >> [snip] Both should be supported. >> > > Agreed. > > >> Since we interpret streaming very broadly (also including analysis of >> historic streams or timely data), the "backpressure/catch-up" mode seemed >> natural as the first one to implement. >> > > Indeed, this is what my job is doing. I have set it to, lacking a valid > offset, start from the beginning. I have to presume that in my case the > stream data is expiring faster than my consumers can keep up. However I > haven't investigated proper monitoring yet. > > >> The "load shedding" variant can probably even be realized in the Kafka >> consumer, without complex modifications to the core Flink runtime itself. >> > > I agree here as well. Indeed, this exception is being thrown from the > consumer, not the runtime. > > > >> On Sun, Jan 17, 2016 at 12:42 AM, Nick Dimiduk >> wrote: >> >>> This goes back to the idea that streaming applications should never go >>> down. I'd much rather consume at max capacity and knowingly drop some >>> portion of the incoming pipe than have the streaming job crash. Of course, >>> once the job itself is robust, I still need the runtime to be robust -- >>> YARN vs (potential) Mesos vs standalone cluster will be my next >>> consideration. >>> >>> I can share some details about my setup, but not at this time; in part >>> because I don't have my metrics available at the moment and in part because >>> this is a public, archived list. >>> >>> On Sat, Jan 16, 2016 at 8:23 AM, Stephan Ewen wrote: >>> >>>> @Robert: Is it possible to add a "fallback" strategy to the consumer? >>>> Something like "if offsets cannot be found, use latest"? >>>> >>>> I would make this an optional feature to activate. I would think it is >>>> quite surprising to users if records start being skipped in certain >>>> situations. But I can see that this would be desirable sometimes. >>>> >>>> More control over skipping the records could be something to implement >>>> in an extended version of the Kafka Consumer. A user could define a policy >>>> that, in case consumer falls behind producer more than X (offsets), it >>>> starts requesting the latest offsets (rather than the following), thereby >>>> skipping a bunch of records. >>>> >>>> >>>> >>>> On Sat, Jan 16, 2016 at 3:14 PM, Robert Metzger >>>> wrote: >>>> >>>>> Hi Nick, >>>>> >>>>> I'm sorry you ran into the issue. Is it possible that Flink's Kafka >>>>> consumer falls back in the topic so far that the offsets it's requesting >>>>> are invalid? >>>>> >>>>> For that, the retention time of Kafka has to be pretty short. >>>>> >>>>> Skipping records under load is something currently not supported by >>>>> Flink itself. The only idea I had for handling this would be to give the >>>>> DeserializationSchema a call back to request the latest offset from Kafka >>>>> to determine the lag. With that, the schema could determine a "dropping >>>>> rate" to catch up. >>>>> What would you as an application developer expect to handle the >>>>> situation? >>>>> >>>>> >>>>> Just out of curiosity: What's the throughput you have on the Kafka >>>>> topic? >>>>> >>>>> >>>>> On Fri, Jan 15, 2016 at 10:13 PM, Nick Dimiduk >>>>> wrote: >>>>> >>>>>> Hi folks, >>>>>> >>>>>> I have a streaming job that consumes from of a kafka topic. The topic >>>>>> is pretty active so the local-mode single worker is obviously not able to >>>>>> keep up with the fire-hose. I expect the job to skip records and continue >>>>>> on. However, I'm getting an exception from the LegacyFetcher which kills >>>>>> the job. This is very much *not* what I want. Any thoughts? The only thing >>>>>> I find when I search for this error message is a link back to FLINK-2656. >>>>>> I'm running roughly 0.10-release/HEAD. >>>>>> >>>>>> Thanks a lot, >>>>>> Nick >>>>>> >>>>>> java.lang.Exception: Found invalid offsets more than once in >>>>>> partitions [FetchPartition {partition=X, offset=Y}] Exceptions: >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>>>>> at >>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> Caused by: java.lang.RuntimeException: Found invalid offsets more >>>>>> than once in partitions [FetchPartition {partition=X, offset=Y}] >>>>>> Exceptions: >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:412) >>>>>> >>>>>> >>>>> >>>> >>> >> --001a114205ba59eb5e0529c1e373 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hey Nick,

I had a discussion with Steph= an Ewen on how we could resolve the issue.=C2=A0
I filed a JIRA w= ith our suggested approach: https://issues.apache.org/jira/browse/FLINK-3264

By handling this directly in the KafkaConsumer, we wou= ld avoid fetching data we can not handle anyways (discarding in the deseria= lization schema would be more inefficient).

Let us= know what you think about our suggested approach.

Sadly, it seems that the Kafka 0.9 consumer API does not yet support reque= sting the latest offset of a TopicPartition. I'll ask about this on the= ir ML.




On Sun, Jan 17, 2016 at 8:2= 8 PM, Nick Dimiduk <ndimiduk@gmail.com> wrote:
On Sunday, January 17, 2016, Stephan= Ewen <sewen@apach= e.org> wrote:
I a= gree, real time streams should never go down.
=C2=A0=
=C2=A0Glad to hear that :)
=C2=A0
[snip]=C2=A0Both should be supporte= d.

Agreed.
=C2=A0
Since we inter= pret streaming very broadly (also including analysis of historic streams or= timely data), the "backpressure/catch-up" mode seemed natural as= the first one to implement.
=C2=A0
Indeed, this is what my job is doing. I have set it to, lacking a valid of= fset, start from the beginning. I have to presume that in my case the strea= m data is expiring faster than my consumers can keep up. However I haven= 9;t investigated proper monitoring yet.
=C2=A0
The "load shedding= " variant can probably even be realized in the Kafka consumer, without= complex modifications to the core Flink runtime itself.
=

I agree here as well. Indeed, this exception is being= thrown from the consumer, not the runtime.

=C2=A0
On Sun, Jan 17, 2016 at 12:42 AM, Nick Dimiduk <ndimiduk@gmail.com> wrote:
This goes back to the = idea that streaming applications should never go down. I'd much rather = consume at max capacity and knowingly drop some portion of the incoming pip= e than have the streaming job crash. Of course, once the job itself is robu= st, I still need the runtime to be robust -- YARN vs (potential) Mesos vs s= tandalone cluster will be my next consideration.

I can s= hare some details about my setup, but not at this time; in part because I d= on't have my metrics available at the moment and in part because this i= s a public, archived list.
=
On Sat, Jan 16, 2016 at 8:23 AM, Stephan Ewe= n <sewen@apache.org> wrote:
@Robert: Is it possible to a= dd a "fallback" strategy to the consumer? Something like "if= offsets cannot be found, use latest"?

I woul= d make this an optional feature to activate. I would think it is quite surp= rising to users if records start being skipped in certain situations. But I= can see that this would be desirable sometimes.

M= ore control over skipping the records could be something to implement in an= extended version of the Kafka Consumer. A user could define a policy that,= in case consumer falls behind producer more than X (offsets), it starts re= questing the latest offsets (rather than the following), thereby skipping a= bunch of records.


<= div class=3D"gmail_extra">
On Sat, Jan 16, 20= 16 at 3:14 PM, Robert Metzger <rmetzger@apache.org<= /a>> wrote:
Hi= Nick,

I'm sorry you ran into the issue. Is it possi= ble that Flink's Kafka consumer falls back in the topic so far that the= offsets it's requesting are invalid?

For that= , the retention time of Kafka has to be pretty short.

<= div>Skipping records under load is something currently not supported by Fli= nk itself. The only idea I had for handling this would be to give the Deser= ializationSchema a call back to request the latest offset from Kafka to det= ermine the lag. With that, the schema could determine a "dropping rate= " to catch up.
What would you as an application developer ex= pect to handle the situation?


Just = out of curiosity: What's the throughput you have on the Kafka topic?


On Fri, Jan 15, 2016 at 10:13 PM, Nick Dimiduk <ndimiduk@gmail.com> wrote:
Hi folks,

I have a stre= aming job that consumes from of a kafka topic. The topic is pretty active s= o the local-mode single worker is obviously not able to keep up with the fi= re-hose. I expect the job to skip records and continue on. However, I'm= getting an exception from the LegacyFetcher which kills the job. This is v= ery much *not* what I want. Any thoughts? The only thing I find when I sear= ch for this error message is a link back to FLINK-2656. I'm running rou= ghly 0.10-release/HEAD.

Thanks a lot,
Ni= ck

java.l= ang.Exception: Found invalid offsets more than once in partitions [FetchPar= tition {partition=3DX, offset=3DY}] Exceptions:=C2=A0
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.f= link.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.j= ava:242)
=C2=A0 =C2=A0= =C2=A0 =C2=A0 at org.apache.flink.streaming.connectors.kafka.FlinkKafkaCon= sumer.run(FlinkKafkaConsumer.java:399)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.streaming.= api.operators.StreamSource.run(StreamSource.java:58)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.fl= ink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)<= /font>
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(Stream= Task.java:218)
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.taskmanager.Task.run(Task.= java:584)
=C2=A0 =C2= =A0 =C2=A0 =C2=A0 at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException:= Found invalid offsets more than once in partitions [FetchPartition {partit= ion=3DX, offset=3DY}] Exceptions:=C2=A0
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.streaming= .connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFe= tcher.java:412)






--001a114205ba59eb5e0529c1e373--