flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nick Dimiduk <ndimi...@gmail.com>
Subject Re: Frequent exceptions killing streaming job
Date Fri, 26 Feb 2016 16:56:55 GMT
Sorry I wasn't clear. No, the lock contention is not in Flink.

On Friday, February 26, 2016, Stephan Ewen <sewen@apache.org> wrote:

> Was the contended lock part of Flink's runtime, or the application code?
> If it was part of the Flink Runtime, can you share what you found?
>
> On Thu, Feb 25, 2016 at 6:03 PM, Nick Dimiduk <ndimiduk@gmail.com
> <javascript:_e(%7B%7D,'cvml','ndimiduk@gmail.com');>> wrote:
>
>> For what it's worth, I dug into the TM logs and found that this exception
>> was not the root cause, merely a symptom of other backpressure building in
>> the flow (actually, lock contention in another part of the stack). While
>> Flink was helpful in finding and bubbling up this stack to the UI, it was
>> ultimately missleading, caused me to overlook proper evaluation of the
>> failure.
>>
>> On Wed, Jan 20, 2016 at 2:59 AM, Robert Metzger <rmetzger@apache.org
>> <javascript:_e(%7B%7D,'cvml','rmetzger@apache.org');>> wrote:
>>
>>> Hey Nick,
>>>
>>> I had a discussion with Stephan Ewen on how we could resolve the issue.
>>> I filed a JIRA with our suggested approach:
>>> https://issues.apache.org/jira/browse/FLINK-3264
>>>
>>> By handling this directly in the KafkaConsumer, we would avoid fetching
>>> data we can not handle anyways (discarding in the deserialization schema
>>> would be more inefficient).
>>>
>>> Let us know what you think about our suggested approach.
>>>
>>> Sadly, it seems that the Kafka 0.9 consumer API does not yet support
>>> requesting the latest offset of a TopicPartition. I'll ask about this on
>>> their ML.
>>>
>>>
>>>
>>>
>>> On Sun, Jan 17, 2016 at 8:28 PM, Nick Dimiduk <ndimiduk@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','ndimiduk@gmail.com');>> wrote:
>>>
>>>> On Sunday, January 17, 2016, Stephan Ewen <sewen@apache.org
>>>> <javascript:_e(%7B%7D,'cvml','sewen@apache.org');>> wrote:
>>>>
>>>>> I agree, real time streams should never go down.
>>>>>
>>>>
>>>>  Glad to hear that :)
>>>>
>>>>
>>>>> [snip] Both should be supported.
>>>>>
>>>>
>>>> Agreed.
>>>>
>>>>
>>>>> Since we interpret streaming very broadly (also including analysis of
>>>>> historic streams or timely data), the "backpressure/catch-up" mode seemed
>>>>> natural as the first one to implement.
>>>>>
>>>>
>>>> Indeed, this is what my job is doing. I have set it to, lacking a valid
>>>> offset, start from the beginning. I have to presume that in my case the
>>>> stream data is expiring faster than my consumers can keep up. However I
>>>> haven't investigated proper monitoring yet.
>>>>
>>>>
>>>>> The "load shedding" variant can probably even be realized in the Kafka
>>>>> consumer, without complex modifications to the core Flink runtime itself.
>>>>>
>>>>
>>>> I agree here as well. Indeed, this exception is being thrown from the
>>>> consumer, not the runtime.
>>>>
>>>>
>>>>
>>>>> On Sun, Jan 17, 2016 at 12:42 AM, Nick Dimiduk <ndimiduk@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> This goes back to the idea that streaming applications should never
>>>>>> go down. I'd much rather consume at max capacity and knowingly drop
some
>>>>>> portion of the incoming pipe than have the streaming job crash. Of
course,
>>>>>> once the job itself is robust, I still need the runtime to be robust
--
>>>>>> YARN vs (potential) Mesos vs standalone cluster will be my next
>>>>>> consideration.
>>>>>>
>>>>>> I can share some details about my setup, but not at this time; in
>>>>>> part because I don't have my metrics available at the moment and
in part
>>>>>> because this is a public, archived list.
>>>>>>
>>>>>> On Sat, Jan 16, 2016 at 8:23 AM, Stephan Ewen <sewen@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> @Robert: Is it possible to add a "fallback" strategy to the
>>>>>>> consumer? Something like "if offsets cannot be found, use latest"?
>>>>>>>
>>>>>>> I would make this an optional feature to activate. I would think
it
>>>>>>> is quite surprising to users if records start being skipped in
certain
>>>>>>> situations. But I can see that this would be desirable sometimes.
>>>>>>>
>>>>>>> More control over skipping the records could be something to
>>>>>>> implement in an extended version of the Kafka Consumer. A user
could define
>>>>>>> a policy that, in case consumer falls behind producer more than
X
>>>>>>> (offsets), it starts requesting the latest offsets (rather than
the
>>>>>>> following), thereby skipping a bunch of records.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Jan 16, 2016 at 3:14 PM, Robert Metzger <rmetzger@apache.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi Nick,
>>>>>>>>
>>>>>>>> I'm sorry you ran into the issue. Is it possible that Flink's
Kafka
>>>>>>>> consumer falls back in the topic so far that the offsets
it's requesting
>>>>>>>> are invalid?
>>>>>>>>
>>>>>>>> For that, the retention time of Kafka has to be pretty short.
>>>>>>>>
>>>>>>>> Skipping records under load is something currently not supported
by
>>>>>>>> Flink itself. The only idea I had for handling this would
be to give the
>>>>>>>> DeserializationSchema a call back to request the latest offset
from Kafka
>>>>>>>> to determine the lag. With that, the schema could determine
a "dropping
>>>>>>>> rate" to catch up.
>>>>>>>> What would you as an application developer expect to handle
the
>>>>>>>> situation?
>>>>>>>>
>>>>>>>>
>>>>>>>> Just out of curiosity: What's the throughput you have on
the Kafka
>>>>>>>> topic?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jan 15, 2016 at 10:13 PM, Nick Dimiduk <ndimiduk@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi folks,
>>>>>>>>>
>>>>>>>>> I have a streaming job that consumes from of a kafka
topic. The
>>>>>>>>> topic is pretty active so the local-mode single worker
is obviously not
>>>>>>>>> able to keep up with the fire-hose. I expect the job
to skip records and
>>>>>>>>> continue on. However, I'm getting an exception from the
LegacyFetcher which
>>>>>>>>> kills the job. This is very much *not* what I want. Any
thoughts? The only
>>>>>>>>> thing I find when I search for this error message is
a link back to
>>>>>>>>> FLINK-2656. I'm running roughly 0.10-release/HEAD.
>>>>>>>>>
>>>>>>>>> Thanks a lot,
>>>>>>>>> Nick
>>>>>>>>>
>>>>>>>>> java.lang.Exception: Found invalid offsets more than
once in
>>>>>>>>> partitions [FetchPartition {partition=X, offset=Y}] Exceptions:
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>>> Caused by: java.lang.RuntimeException: Found invalid
offsets more
>>>>>>>>> than once in partitions [FetchPartition {partition=X,
offset=Y}]
>>>>>>>>> Exceptions:
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:412)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Mime
View raw message