flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tony Wei <tony19920...@gmail.com>
Subject Re: Stream Task seems to be blocked after checkpoint timeout
Date Thu, 28 Sep 2017 10:59:19 GMT
Hi,

Sorry. This is the correct one.

Best Regards,
Tony Wei

2017-09-28 18:55 GMT+08:00 Tony Wei <tony19920430@gmail.com>:

> Hi Stefan,
>
> Sorry for providing partial information. The attachment is the full logs
> for checkpoint #1577.
>
> Why I would say it seems that asynchronous part was not executed
> immediately is due to all synchronous parts were all finished at 2017-09-27
> 13:49.
> Did that mean the checkpoint barrier event had already arrived at the
> operator and started as soon as when the JM triggered the checkpoint?
>
> Best Regards,
> Tony Wei
>
> 2017-09-28 18:22 GMT+08:00 Stefan Richter <s.richter@data-artisans.com>:
>
>> Hi,
>>
>> I agree that the memory consumption looks good. If there is only one TM,
>> it will run inside one JVM. As for the 7 minutes, you mean the reported
>> end-to-end time? This time measurement starts when the checkpoint is
>> triggered on the job manager, the first contributor is then the time that
>> it takes for the checkpoint barrier event to travel with the stream to the
>> operators. If there is back pressure and a lot of events are buffered, this
>> can introduce delay to this first part, because barriers must not overtake
>> data for correctness. After the barrier arrives at the operator, next comes
>> the synchronous part of the checkpoint, which is typically short running
>> and takes a snapshot of the state (think of creating an immutable version,
>> e.g. through copy on write). In the asynchronous part, this snapshot is
>> persisted to DFS. After that the timing stops and is reported together with
>> the acknowledgement to the job manager.
>>
>> So, I would assume if reporting took 7 minutes end-to-end, and the async
>> part took 4 minutes, it is likely that it took around 3 minutes for the
>> barrier event to travel with the stream. About the debugging, I think it is
>> hard to figure out what is going on with the DFS if you don’t have metrics
>> on that. Maybe you could attach a sampler to the TM’s jvm and monitor where
>> time is spend for the snapshotting?
>>
>> I am also looping in Stephan, he might have more suggestions.
>>
>> Best,
>> Stefan
>>
>> Am 28.09.2017 um 11:25 schrieb Tony Wei <tony19920430@gmail.com>:
>>
>> Hi Stefan,
>>
>> These are some telemetry information, but I don't have history
>> information about gc.
>>
>> <???? 2017-09-2 8 下午4.51.26.png>
>> <???? 2017-09-2 8 下午4.51.11.png>
>>
>> 1) Yes, my state is not large.
>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem.
>> Since this is a POC, we might move to AWS in the future or use HDFS in the
>> same cluster. However, how can I recognize the problem is this.
>> 3) It seems memory usage is bounded. I'm not sure if the status showed
>> above is fine.
>>
>> There is only one TM in my cluster for now, so all tasks are running on
>> that machine. I think that means they are in the same JVM, right?
>> Besides taking so long on asynchronous part, there is another question is
>> that the late message showed that this task was delay for almost 7 minutes,
>> but the log showed it only took 4 minutes.
>> It seems that it was somehow waiting for being executed. Are there some
>> points to find out what happened?
>>
>> For the log information, what I means is it is hard to recognize which
>> checkpoint id that asynchronous parts belong to if the checkpoint takes
>> more time and there are more concurrent checkpoints taking place.
>> Also, it seems that asynchronous part might be executed right away if
>> there is no resource from thread pool. It is better to measure the time
>> between creation time and processing time, and log it and checkpoint id
>> with the original log that showed what time the asynchronous part took.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s.richter@data-artisans.com>:
>>
>>> Hi,
>>>
>>> when the async part takes that long I would have 3 things to look at:
>>>
>>> 1) Is your state so large? I don’t think this applies in your case,
>>> right?
>>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>>> 3) Are we running low on memory on that task manager?
>>>
>>> Do you have telemetry information about used heap and gc pressure on the
>>> problematic task? However, what speaks against the memory problem
>>> hypothesis is that future checkpoints seem to go through again. What I find
>>> very strange is that within the reported 4 minutes of the async part the
>>> only thing that happens is: open dfs output stream, iterate the in-memory
>>> state and write serialized state data to dfs stream, then close the stream.
>>> No locks or waits in that section, so I would assume that for one of the
>>> three reasons I gave, writing the state is terribly slow.
>>>
>>> Those snapshots should be able to run concurrently, for example so that
>>> users can also take savepoints  even when a checkpoint was triggered and is
>>> still running, so there is no way to guarantee that the previous parts have
>>> finished, this is expected behaviour. Which waiting times are you missing
>>> in the log? I think the information about when a checkpoint is triggered,
>>> received by the TM, performing the sync and async part and acknowledgement
>>> time should all be there?.
>>>
>>> Best,
>>> Stefan
>>>
>>>
>>>
>>> Am 28.09.2017 um 08:18 schrieb Tony Wei <tony19920430@gmail.com>:
>>>
>>> Hi Stefan,
>>>
>>> The checkpoint on my job has been subsumed again. There are some
>>> questions that I don't understand.
>>>
>>> Log in JM :
>>> 2017-09-27 13:45:15,686 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
>>> 2017-09-27 13:49:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Triggering checkpoint 1577 @ 1506520182795
>>> 2017-09-27 13:54:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Triggering checkpoint 1578 @ 1506520482795
>>> 2017-09-27 13:55:13,105 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
>>> 2017-09-27 13:56:37,103 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Received late message for now expired checkpoint attempt 1577 from
>>> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
>>> 2017-09-27 13:59:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Triggering checkpoint 1579 @ 1506520782795
>>>
>>> Log in TM:
>>> 2017-09-27 13:56:37,105 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend
>>> - DefaultOperatorStateBackend snapshot (File Stream Factory @
>>> s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2,
>>> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task
>>> Threads] took 240248 ms.
>>>
>>> I think the log in TM might be the late message for #1577 in JM, because
>>> #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.
>>> If there is no mistake on my words, I am wondering why the time it took
>>> was 240248 ms (4 min). It seems that it started late than asynchronous
>>> tasks in #1578.
>>> Is there any way to guarantee the previous asynchronous parts of
>>> checkpoints will be executed before the following.
>>>
>>> Moreover, I think it will be better to have more information in INFO
>>> log, such as waiting time and checkpoint id, in order to trace the progress
>>> of checkpoint conveniently.
>>>
>>> What do you think? Do you have any suggestion for me to deal with these
>>> problems? Thank you.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2017-09-27 17:11 GMT+08:00 Tony Wei <tony19920430@gmail.com>:
>>>
>>>> Hi Stefan,
>>>>
>>>> Here is the summary for my streaming job's checkpoint after restarting
>>>> at last night.
>>>>
>>>> <???? 2017-09-2 7 下午4.56.30.png>
>>>>
>>>> This is the distribution of alignment buffered from the last 12 hours.
>>>>
>>>> <???? 2017-09-2 7 下午5.05.11.png>
>>>>
>>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For chk
>>>> #1245 and #1246, you can check the picture I sent before.
>>>>
>>>>  <???? 2017-09-2 7 下午5.01.24.png>
>>>>
>>>> AFAIK, the back pressure rate usually is in LOW status, sometimes goes
>>>> up to HIGH, and always OK during the night.
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>>
>>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <s.richter@data-artisans.com>
>>>> :
>>>>
>>>>> Hi Tony,
>>>>>
>>>>> are your checkpoints typically close to the timeout boundary? From
>>>>> what I see, writing the checkpoint is relatively fast but the time from
the
>>>>> checkpoint trigger to execution seems very long. This is typically the
case
>>>>> if your job has a lot of backpressure and therefore the checkpoint barriers
>>>>> take a long time to travel to the operators, because a lot of events
are
>>>>> piling up in the buffers. Do you also experience large alignments for
your
>>>>> checkpoints?
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <tony19920430@gmail.com>:
>>>>>
>>>>> Hi Stefan,
>>>>>
>>>>> It seems that I found something strange from JM's log.
>>>>>
>>>>> It had happened more than once before, but all subtasks would finish
>>>>> their checkpoint attempts in the end.
>>>>>
>>>>> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1140 @ 1506389008690
>>>>> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1141 @ 1506389308690
>>>>> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1142 @ 1506389608690
>>>>> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Checkpoint 1140 expired before completing.
>>>>> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Checkpoint 1141 expired before completing.
>>>>> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Received late message for now expired checkpoint attempt 1140 from
>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Received late message for now expired checkpoint attempt 1141 from
>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>>>>>
>>>>> For chk #1245 and #1246, there was no late message from TM. You can
>>>>> refer to the TM log. The full completed checkpoint attempt will have
12
>>>>> (... asynchronous part) logs in general, but #1245 and #1246 only got
10
>>>>> logs.
>>>>>
>>>>> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1245 @ 1506420508690
>>>>> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1246 @ 1506420808690
>>>>> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Checkpoint 1245 expired before completing.
>>>>> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Checkpoint 1246 expired before completing.
>>>>>
>>>>> Moreover, I listed the directory for checkpoints on S3 and saw there
>>>>> were two states not discarded successfully. In general, there will be
16
>>>>> parts for a completed checkpoint state.
>>>>>
>>>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c0
>>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf
>>>>> 0b-11cc1fc67ab8
>>>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c0
>>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-85
>>>>> 09-5fea4ed25af6
>>>>>
>>>>> Hope these informations are helpful. Thank you.
>>>>>
>>>>> Best Regards,
>>>>> Tony Wei
>>>>>
>>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s.richter@data-artisans.com
>>>>> >:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> thanks for the information. Unfortunately, I have no immediate idea
>>>>>> what the reason is from the given information. I think most helpful
could
>>>>>> be a thread dump, but also metrics on the operator operator level
to figure
>>>>>> out which part of the pipeline is the culprit.
>>>>>>
>>>>>> Best,
>>>>>> Stefan
>>>>>>
>>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920430@gmail.com>:
>>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>> There is no unknown exception in my full log. The Flink version is
>>>>>> 1.3.2.
>>>>>> My job is roughly like this.
>>>>>>
>>>>>> env.addSource(Kafka)
>>>>>>   .map(ParseKeyFromRecord)
>>>>>>   .keyBy()
>>>>>>   .process(CountAndTimeoutWindow)
>>>>>>   .asyncIO(UploadToS3)
>>>>>>   .addSink(UpdateDatabase)
>>>>>>
>>>>>> It seemed all tasks stopped like the picture I sent in the last email.
>>>>>>
>>>>>> I will keep my eye on taking a thread dump from that JVM if this
>>>>>> happens again.
>>>>>>
>>>>>> Best Regards,
>>>>>> Tony Wei
>>>>>>
>>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <
>>>>>> s.richter@data-artisans.com>:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> that is very strange indeed. I had a look at the logs and there
is
>>>>>>> no error or exception reported. I assume there is also no exception
in your
>>>>>>> full logs? Which version of flink are you using and what operators
were
>>>>>>> running in the task that stopped? If this happens again, would
it be
>>>>>>> possible to take a thread dump from that JVM?
>>>>>>>
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>>
>>>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920430@gmail.com>:
>>>>>>> >
>>>>>>> > Hi,
>>>>>>> >
>>>>>>> > Something weird happened on my streaming job.
>>>>>>> >
>>>>>>> > I found my streaming job seems to be blocked for a long
time and I
>>>>>>> saw the situation like the picture below. (chk #1245 and #1246
were all
>>>>>>> finishing 7/8 tasks then marked timeout by JM. Other checkpoints
failed
>>>>>>> with the same state like #1247 util I restarted TM.)
>>>>>>> >
>>>>>>> > <snapshot.png>
>>>>>>> >
>>>>>>> > I'm not sure what happened, but the consumer stopped fetching
>>>>>>> records, buffer usage is 100% and the following task did not
seem to fetch
>>>>>>> data anymore. Just like the whole TM was stopped.
>>>>>>> >
>>>>>>> > However, after I restarted TM and force the job restarting
from
>>>>>>> the latest completed checkpoint, everything worked again. And
I don't know
>>>>>>> how to reproduce it.
>>>>>>> >
>>>>>>> > The attachment is my TM log. Because there are many user
logs and
>>>>>>> sensitive information, I only remain the log from `org.apache.flink...`.
>>>>>>> >
>>>>>>> > My cluster setting is one JM and one TM with 4 available
slots.
>>>>>>> >
>>>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins
and
>>>>>>> max concurrent number is 3.
>>>>>>> >
>>>>>>> > Please let me know if it needs more information to find
out what
>>>>>>> happened on my streaming job. Thanks for your help.
>>>>>>> >
>>>>>>> > Best Regards,
>>>>>>> > Tony Wei
>>>>>>> > <flink-root-taskmanager-0-partial.log>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>>
>

Mime
View raw message