flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tony Wei <tony19920...@gmail.com>
Subject Re: Stream Task seems to be blocked after checkpoint timeout
Date Wed, 27 Sep 2017 09:11:07 GMT
Hi Stefan,

Here is the summary for my streaming job's checkpoint after restarting at
last night.

[image: 內置圖片 1]

This is the distribution of alignment buffered from the last 12 hours.

[image: 內置圖片 3]

And here is the buffer out pool usage during chk #1140 ~ #1142. For chk
#1245 and #1246, you can check the picture I sent before.

 [image: 內置圖片 2]

AFAIK, the back pressure rate usually is in LOW status, sometimes goes up
to HIGH, and always OK during the night.

Best Regards,
Tony Wei


2017-09-27 16:54 GMT+08:00 Stefan Richter <s.richter@data-artisans.com>:

> Hi Tony,
>
> are your checkpoints typically close to the timeout boundary? From what I
> see, writing the checkpoint is relatively fast but the time from the
> checkpoint trigger to execution seems very long. This is typically the case
> if your job has a lot of backpressure and therefore the checkpoint barriers
> take a long time to travel to the operators, because a lot of events are
> piling up in the buffers. Do you also experience large alignments for your
> checkpoints?
>
> Best,
> Stefan
>
> Am 27.09.2017 um 10:43 schrieb Tony Wei <tony19920430@gmail.com>:
>
> Hi Stefan,
>
> It seems that I found something strange from JM's log.
>
> It had happened more than once before, but all subtasks would finish their
> checkpoint attempts in the end.
>
> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1140 @ 1506389008690
> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1141 @ 1506389308690
> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1142 @ 1506389608690
> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Checkpoint 1140 expired before completing.
> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Checkpoint 1141 expired before completing.
> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Received late message for now expired checkpoint attempt 1140 from
> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Received late message for now expired checkpoint attempt 1141 from
> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>
> For chk #1245 and #1246, there was no late message from TM. You can refer
> to the TM log. The full completed checkpoint attempt will have 12
> (... asynchronous part) logs in general, but #1245 and #1246 only got 10
> logs.
>
> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1245 @ 1506420508690
> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1246 @ 1506420808690
> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Checkpoint 1245 expired before completing.
> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Checkpoint 1246 expired before completing.
>
> Moreover, I listed the directory for checkpoints on S3 and saw there were
> two states not discarded successfully. In general, there will be 16 parts
> for a completed checkpoint state.
>
> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/
> 7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-
> 45a5-bf0b-11cc1fc67ab8
> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/
> 7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-
> 465d-8509-5fea4ed25af6
>
> Hope these informations are helpful. Thank you.
>
> Best Regards,
> Tony Wei
>
> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s.richter@data-artisans.com>:
>
>> Hi,
>>
>> thanks for the information. Unfortunately, I have no immediate idea what
>> the reason is from the given information. I think most helpful could be a
>> thread dump, but also metrics on the operator operator level to figure out
>> which part of the pipeline is the culprit.
>>
>> Best,
>> Stefan
>>
>> Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920430@gmail.com>:
>>
>> Hi Stefan,
>>
>> There is no unknown exception in my full log. The Flink version is 1.3.2.
>> My job is roughly like this.
>>
>> env.addSource(Kafka)
>>   .map(ParseKeyFromRecord)
>>   .keyBy()
>>   .process(CountAndTimeoutWindow)
>>   .asyncIO(UploadToS3)
>>   .addSink(UpdateDatabase)
>>
>> It seemed all tasks stopped like the picture I sent in the last email.
>>
>> I will keep my eye on taking a thread dump from that JVM if this happens
>> again.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s.richter@data-artisans.com>:
>>
>>> Hi,
>>>
>>> that is very strange indeed. I had a look at the logs and there is no
>>> error or exception reported. I assume there is also no exception in your
>>> full logs? Which version of flink are you using and what operators were
>>> running in the task that stopped? If this happens again, would it be
>>> possible to take a thread dump from that JVM?
>>>
>>> Best,
>>> Stefan
>>>
>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920430@gmail.com>:
>>> >
>>> > Hi,
>>> >
>>> > Something weird happened on my streaming job.
>>> >
>>> > I found my streaming job seems to be blocked for a long time and I saw
>>> the situation like the picture below. (chk #1245 and #1246 were all
>>> finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed
>>> with the same state like #1247 util I restarted TM.)
>>> >
>>> > <snapshot.png>
>>> >
>>> > I'm not sure what happened, but the consumer stopped fetching records,
>>> buffer usage is 100% and the following task did not seem to fetch data
>>> anymore. Just like the whole TM was stopped.
>>> >
>>> > However, after I restarted TM and force the job restarting from the
>>> latest completed checkpoint, everything worked again. And I don't know how
>>> to reproduce it.
>>> >
>>> > The attachment is my TM log. Because there are many user logs and
>>> sensitive information, I only remain the log from `org.apache.flink...`.
>>> >
>>> > My cluster setting is one JM and one TM with 4 available slots.
>>> >
>>> > Streaming job uses all slots, checkpoint interval is 5 mins and max
>>> concurrent number is 3.
>>> >
>>> > Please let me know if it needs more information to find out what
>>> happened on my streaming job. Thanks for your help.
>>> >
>>> > Best Regards,
>>> > Tony Wei
>>> > <flink-root-taskmanager-0-partial.log>
>>>
>>>
>>
>>
>
>

Mime
View raw message