flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Padarn Wilson <pad...@gmail.com>
Subject Re: Setting source vs sink vs window parallelism with data increase
Date Sat, 23 Mar 2019 11:49:44 GMT
Well.. it turned out I was registering millions of timers by accident,
which was why garbage collection was blowing up. Oops. Thanks for your help
again.

On Wed, Mar 6, 2019 at 9:44 PM Padarn Wilson <padarn@gmail.com> wrote:

> Thanks a lot for your suggestion. I’ll dig into it and update for the
> mailing list if I find anything useful.
>
> Padarn
>
> On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski <piotr@ververica.com> wrote:
>
>> Re-adding user mailing list.
>>
>>
>> Hi,
>>
>> If it is a GC issue, only GC logs or some JVM memory profilers (like
>> Oracle’s Mission Control) can lead you to the solution. Once you confirm
>> that it’s a GC issue, there are numerous resources online how to analyse
>> the cause of the problem. For that, it is difficult to use CPU
>> profiling/Flink Metrics, since GC issues caused by one thread, can cause
>> performance bottlenecks in other unrelated places.
>>
>> If that’s not a GC issue, you can use Flink metrics (like number of
>> buffered input/output data) to find Task that’s causing a bottleneck. Then
>> you can use CPU profiler to analyse why is that happening.
>>
>> Piotrek
>>
>> On 6 Mar 2019, at 02:52, Padarn Wilson <padarn@gmail.com> wrote:
>>
>> Hi Piotr,
>>
>> Thanks for your feedback. Makes sense about the checkpoint barriers -
>> this definitely could be the cause of a problem.
>>
>> I would advice profiling your job to find out what’s going on.
>>
>>
>> Agreed. Outside of inspecting the Flink metrics, do you have suggestions
>> for tools with which to do this?
>>
>> The main thing I'm trying to pin down is:
>> 1) Is it the downstream processing from the expansion of records that
>> causes a problem, or
>> 2) Is is the shuffle of all the records after the expansion which is
>> taking a large time - if so, is there anything I can do to mitigate this
>> other than trying to ensure less shuffle.
>>
>> Thanks,
>> Padarn
>>
>>
>> On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <piotr@ververica.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Do you mind elaborating on this? What technology would you propose as an
>>> alternative, and why would this increase checkpointing time?
>>>
>>>
>>> The problem is that when Flink starts checkpoint and inject checkpoint
>>> barriers, those checkpoint barriers travel through the Job Graph. The
>>> quicker they can do that the better. How fast does it take depends on the
>>> amount of buffered data before checkpoint barriers (currently all of such
>>> records must be processed before checkpoint barrier is passed down stream).
>>> The more buffered records and the more time it takes to process those
>>> records, the longer the checkpoint take time. Obviously if one stage in the
>>> job is multiplying the amount of records, it can in a way multiply the
>>> amount of “buffered work” that needs to be processed before checkpoint
>>> barriers pass through.
>>>
>>> However it might not be the case for you. To analyse what’s going on you
>>> would need to look at various Flink metrics, like checkpoint times, back
>>> pressured tasks, state of the output/input buffers of the tasks, etc.
>>> However #2, those are secondary issues. First of all you should try to pin
>>> point the cause of long GC pauses. If it comes from your code, you should
>>> fix this first. If that either isn’t the issue or doesn’t solve it,
>>> generally speaking I would advice profiling your job to find out what’s
>>> going on.
>>>
>>> Piotrek
>>>
>>> On 5 Mar 2019, at 02:00, Padarn Wilson <padarn@gmail.com> wrote:
>>>
>>> Hi Piotr,
>>>
>>> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to
>>> 1.7.2 shortly - there is some S3 fix I'd like to take advantage of).
>>>
>>> Generally speaking Flink might not the best if you have records fan out,
>>>> this may significantly increase checkpointing time.
>>>
>>>
>>> Do you mind elaborating on this? What technology would you propose as an
>>> alternative, and why would this increase checkpointing time?
>>>
>>> However you might want to first identify what’s causing long GC times.
>>>>
>>>
>>> My current plan is to try and enable GC logs and see if I can get
>>> something meaningful from them.
>>>
>>> Thanks a lot,
>>> Padarn
>>>
>>>
>>> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <piotr@ververica.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> What Flink version are you using?
>>>>
>>>> Generally speaking Flink might not the best if you have records fan
>>>> out, this may significantly increase checkpointing time.
>>>>
>>>> However you might want to first identify what’s causing long GC times.
>>>> If there are long GC pause, this should be the first thing to fix.
>>>>
>>>> Piotrek
>>>>
>>>> On 2 Mar 2019, at 08:19, Padarn Wilson <padarn@gmail.com> wrote:
>>>>
>>>> Hi all again - following up on this I think I've identified my problem
>>>> as being something else, but would appreciate if anyone can offer advice.
>>>>
>>>> After running my stream from sometime, I see that my garbage collector
>>>> for old generation starts to take a very long time:
>>>> <Screen Shot 2019-03-02 at 3.01.57 PM.png>
>>>> here the* purple line is young generation time*, this is ever
>>>> increasing, but grows slowly, while the *blue is old generation*.
>>>> This in itself is not a problem, but as soon as the next checkpoint is
>>>> triggered after this happens you see the following:
>>>> <Screen Shot 2019-03-02 at 3.02.48 PM.png>
>>>> It looks like the checkpoint hits a cap, but this is only because the
>>>> checkpoints start to timeout and fail (these are the alignment time per
>>>> operator)
>>>>
>>>> I do notice that my state is growing quite larger over time, but I
>>>> don't have a good understanding of what would cause this to happen with the
>>>> JVM old generation metric, which appears to be the leading metric before
a
>>>> problem is noticed. Other metrics such as network buffers also show that
at
>>>> the checkpoint time things start to go haywire and the situation never
>>>> recovers.
>>>>
>>>> Thanks
>>>>
>>>> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <padarn@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm trying to process many records, and I have an expensive operation
>>>>> I'm trying to optimize. Simplified it is something like:
>>>>>
>>>>> Data: (key1, count, time)
>>>>>
>>>>> Source -> Map(x -> (x, newKeyList(x.key1))
>>>>>             -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
>>>>>             -> Keyby(_.key1).TublingWindow().apply..
>>>>>             -> Sink
>>>>>
>>>>> In the Map -> Flatmap, what is happening is that each key is mapping
>>>>> to a set of keys, and then this is set as the new key. This effectively
>>>>> increase the size of the stream by 16x
>>>>>
>>>>> What I am trying to figure out is how to set the parallelism of my
>>>>> operators. I see in some comments that people suggest your source, sink
and
>>>>> aggregation should have different parallelism, but I'm not clear on exactly
>>>>> why, or what this means for CPU utilization.
>>>>> (see for example
>>>>> https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly
>>>>> )
>>>>>
>>>>> Also, it isn't clear to me the best way to handle this increase in
>>>>> data within the stream itself.
>>>>>
>>>>> Thanks
>>>>>
>>>>
>>>>
>>>
>>

Mime
View raw message