flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tony Wei <tony19920...@gmail.com>
Subject Re: Stream Task seems to be blocked after checkpoint timeout
Date Tue, 03 Oct 2017 05:26:06 GMT
Hi Stefan,

It seems that the similar situation, in which job blocked after checkpoint
timeout, came across to my job. BTW, this is another job that I raised
parallelism and throughput of input.

After chk #8 started, the whole operator seems blocked.

I recorded some JM / TM logs, snapshots and thread dump logs, which the
attachment is. Hope these will help to find the root cause. Thank you.

Best Regards,
Tony Wie

==========================================================================================================================================================

JM log:

2017-10-03 03:46:49,371 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
    - Received late message for now expired checkpoint attempt 7 from
b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72.
2017-10-03 03:47:00,977 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
    - Received late message for now expired checkpoint attempt 8 from
b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72.

TM log:

2017-10-03 03:46:46,962 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
Asynchronous RocksDB snapshot (File Stream Factory @
s3://tony-dev/flink-checkpoints/ecfa5968e831e547ed70d1359a615f72,
asynchronous part) in thread Thread[pool-55-thread-7,5,Flink Task Threads]
took 1211517 ms.

Snapshots:

[image: 內置圖片 1]
[image: 內置圖片 3]

2017-09-28 20:29 GMT+08:00 Tony Wei <tony19920430@gmail.com>:

> Hi Stefan,
>
> That reason makes sense to me. Thanks for point me out.
>
> About my job, the database currently was never used, I disabled it for
> some reasons, but output to s3 was implemented by async io.
>
> I used ForkJoinPool with 50 capacity.
> I have tried to rebalance after count window to monitor the back pressure
> on upload operator.
> The result is always OK status.
> I think the reason is due to that count window buffered lots of records,
> so the input rate in upload operator was not too high.
>
> But I am not sure that if the setup for my capacity of ForkJoinPool would
> impact the process asynchronous checkpoints both machine's resources and s3
> connection.
>
> BTW, s3 serves both operator and checkpointing and I used aws java api to
> access s3 in upload operator in order to control where the files go.
>
> Best Regards,
> Tony Wei
>
> Stefan Richter <s.richter@data-artisans.com>於 2017年9月28日 週四,下午7:43寫道:
>
>> Hi,
>>
>> the gap between the sync and the async part does not mean too much. What
>> happens per task is that all operators go through their sync part, and then
>> one thread executes all the async parts, one after the other. So if an
>> async part starts late, this is just because it started only after another
>> async part finished.
>>
>> I have one more question about your job,because it involves communication
>> with external systems, like S3 and a database. Are you sure that they
>> cannot sometimes become a bottleneck, block, and bring down your job. in
>> particular: is the same S3 used to serve the operator and checkpointing and
>> what is your sustained read/write rate there and the maximum number of
>> connections? You can try to use the backpressure metric and try to identify
>> the first operator (counting from the sink) that indicates backpressure.
>>
>> Best,
>> Stefan
>>
>> Am 28.09.2017 um 12:59 schrieb Tony Wei <tony19920430@gmail.com>:
>>
>> Hi,
>>
>> Sorry. This is the correct one.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-09-28 18:55 GMT+08:00 Tony Wei <tony19920430@gmail.com>:
>>
>>> Hi Stefan,
>>>
>>> Sorry for providing partial information. The attachment is the full logs
>>> for checkpoint #1577.
>>>
>>> Why I would say it seems that asynchronous part was not executed
>>> immediately is due to all synchronous parts were all finished at 2017-09-27
>>> 13:49.
>>> Did that mean the checkpoint barrier event had already arrived at the
>>> operator and started as soon as when the JM triggered the checkpoint?
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2017-09-28 18:22 GMT+08:00 Stefan Richter <s.richter@data-artisans.com>:
>>>
>>>> Hi,
>>>>
>>>> I agree that the memory consumption looks good. If there is only one
>>>> TM, it will run inside one JVM. As for the 7 minutes, you mean the reported
>>>> end-to-end time? This time measurement starts when the checkpoint is
>>>> triggered on the job manager, the first contributor is then the time that
>>>> it takes for the checkpoint barrier event to travel with the stream to the
>>>> operators. If there is back pressure and a lot of events are buffered, this
>>>> can introduce delay to this first part, because barriers must not overtake
>>>> data for correctness. After the barrier arrives at the operator, next comes
>>>> the synchronous part of the checkpoint, which is typically short running
>>>> and takes a snapshot of the state (think of creating an immutable version,
>>>> e.g. through copy on write). In the asynchronous part, this snapshot is
>>>> persisted to DFS. After that the timing stops and is reported together with
>>>> the acknowledgement to the job manager.
>>>>
>>>> So, I would assume if reporting took 7 minutes end-to-end, and the
>>>> async part took 4 minutes, it is likely that it took around 3 minutes for
>>>> the barrier event to travel with the stream. About the debugging, I think
>>>> it is hard to figure out what is going on with the DFS if you don’t have
>>>> metrics on that. Maybe you could attach a sampler to the TM’s jvm and
>>>> monitor where time is spend for the snapshotting?
>>>>
>>>> I am also looping in Stephan, he might have more suggestions.
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>> Am 28.09.2017 um 11:25 schrieb Tony Wei <tony19920430@gmail.com>:
>>>>
>>>> Hi Stefan,
>>>>
>>>> These are some telemetry information, but I don't have history
>>>> information about gc.
>>>>
>>>> <???? 2017-09-2 8 下午4.51.26.png>
>>>> <???? 2017-09-2 8 下午4.51.11.png>
>>>>
>>>> 1) Yes, my state is not large.
>>>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem.
>>>> Since this is a POC, we might move to AWS in the future or use HDFS in the
>>>> same cluster. However, how can I recognize the problem is this.
>>>> 3) It seems memory usage is bounded. I'm not sure if the status showed
>>>> above is fine.
>>>>
>>>> There is only one TM in my cluster for now, so all tasks are running on
>>>> that machine. I think that means they are in the same JVM, right?
>>>> Besides taking so long on asynchronous part, there is another question
>>>> is that the late message showed that this task was delay for almost 7
>>>> minutes, but the log showed it only took 4 minutes.
>>>> It seems that it was somehow waiting for being executed. Are there some
>>>> points to find out what happened?
>>>>
>>>> For the log information, what I means is it is hard to recognize which
>>>> checkpoint id that asynchronous parts belong to if the checkpoint takes
>>>> more time and there are more concurrent checkpoints taking place.
>>>> Also, it seems that asynchronous part might be executed right away if
>>>> there is no resource from thread pool. It is better to measure the time
>>>> between creation time and processing time, and log it and checkpoint id
>>>> with the original log that showed what time the asynchronous part took.
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s.richter@data-artisans.com>
>>>> :
>>>>
>>>>> Hi,
>>>>>
>>>>> when the async part takes that long I would have 3 things to look at:
>>>>>
>>>>> 1) Is your state so large? I don’t think this applies in your case,
>>>>> right?
>>>>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>>>>> 3) Are we running low on memory on that task manager?
>>>>>
>>>>> Do you have telemetry information about used heap and gc pressure on
>>>>> the problematic task? However, what speaks against the memory problem
>>>>> hypothesis is that future checkpoints seem to go through again. What
I find
>>>>> very strange is that within the reported 4 minutes of the async part
the
>>>>> only thing that happens is: open dfs output stream, iterate the in-memory
>>>>> state and write serialized state data to dfs stream, then close the stream.
>>>>> No locks or waits in that section, so I would assume that for one of
the
>>>>> three reasons I gave, writing the state is terribly slow.
>>>>>
>>>>> Those snapshots should be able to run concurrently, for example so
>>>>> that users can also take savepoints  even when a checkpoint was triggered
>>>>> and is still running, so there is no way to guarantee that the previous
>>>>> parts have finished, this is expected behaviour. Which waiting times
are
>>>>> you missing in the log? I think the information about when a checkpoint
is
>>>>> triggered, received by the TM, performing the sync and async part and
>>>>> acknowledgement time should all be there?.
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>>
>>>>>
>>>>> Am 28.09.2017 um 08:18 schrieb Tony Wei <tony19920430@gmail.com>:
>>>>>
>>>>> Hi Stefan,
>>>>>
>>>>> The checkpoint on my job has been subsumed again. There are some
>>>>> questions that I don't understand.
>>>>>
>>>>> Log in JM :
>>>>> 2017-09-27 13:45:15,686 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
>>>>> 2017-09-27 13:49:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1577 @ 1506520182795
>>>>> 2017-09-27 13:54:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1578 @ 1506520482795
>>>>> 2017-09-27 13:55:13,105 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
>>>>> 2017-09-27 13:56:37,103 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Received late message for now expired checkpoint attempt 1577 from
>>>>> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
>>>>> 2017-09-27 13:59:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1579 @ 1506520782795
>>>>>
>>>>> Log in TM:
>>>>> 2017-09-27 13:56:37,105 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend
>>>>> - DefaultOperatorStateBackend snapshot (File Stream Factory @
>>>>> s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2,
>>>>> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task
>>>>> Threads] took 240248 ms.
>>>>>
>>>>> I think the log in TM might be the late message for #1577 in JM,
>>>>> because #1576, #1578 had been finished and #1579 hadn't been started
at
>>>>> 13:56:37.
>>>>> If there is no mistake on my words, I am wondering why the time it
>>>>> took was 240248 ms (4 min). It seems that it started late than asynchronous
>>>>> tasks in #1578.
>>>>> Is there any way to guarantee the previous asynchronous parts of
>>>>> checkpoints will be executed before the following.
>>>>>
>>>>> Moreover, I think it will be better to have more information in INFO
>>>>> log, such as waiting time and checkpoint id, in order to trace the progress
>>>>> of checkpoint conveniently.
>>>>>
>>>>> What do you think? Do you have any suggestion for me to deal with
>>>>> these problems? Thank you.
>>>>>
>>>>> Best Regards,
>>>>> Tony Wei
>>>>>
>>>>> 2017-09-27 17:11 GMT+08:00 Tony Wei <tony19920430@gmail.com>:
>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>> Here is the summary for my streaming job's checkpoint after
>>>>>> restarting at last night.
>>>>>>
>>>>>> <???? 2017-09-2 7 下午4.56.30.png>
>>>>>>
>>>>>> This is the distribution of alignment buffered from the last 12 hours.
>>>>>>
>>>>>> <???? 2017-09-2 7 下午5.05.11.png>
>>>>>>
>>>>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For
>>>>>> chk #1245 and #1246, you can check the picture I sent before.
>>>>>>
>>>>>>  <???? 2017-09-2 7 下午5.01.24.png>
>>>>>>
>>>>>> AFAIK, the back pressure rate usually is in LOW status, sometimes
>>>>>> goes up to HIGH, and always OK during the night.
>>>>>>
>>>>>> Best Regards,
>>>>>> Tony Wei
>>>>>>
>>>>>>
>>>>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <
>>>>>> s.richter@data-artisans.com>:
>>>>>>
>>>>>>> Hi Tony,
>>>>>>>
>>>>>>> are your checkpoints typically close to the timeout boundary?
From
>>>>>>> what I see, writing the checkpoint is relatively fast but the
time from the
>>>>>>> checkpoint trigger to execution seems very long. This is typically
the case
>>>>>>> if your job has a lot of backpressure and therefore the checkpoint
barriers
>>>>>>> take a long time to travel to the operators, because a lot of
events are
>>>>>>> piling up in the buffers. Do you also experience large alignments
for your
>>>>>>> checkpoints?
>>>>>>>
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>>
>>>>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <tony19920430@gmail.com>:
>>>>>>>
>>>>>>> Hi Stefan,
>>>>>>>
>>>>>>> It seems that I found something strange from JM's log.
>>>>>>>
>>>>>>> It had happened more than once before, but all subtasks would
finish
>>>>>>> their checkpoint attempts in the end.
>>>>>>>
>>>>>>> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Triggering checkpoint 1140 @ 1506389008690
>>>>>>> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Triggering checkpoint 1141 @ 1506389308690
>>>>>>> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Triggering checkpoint 1142 @ 1506389608690
>>>>>>> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Checkpoint 1140 expired before completing.
>>>>>>> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Checkpoint 1141 expired before completing.
>>>>>>> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Received late message for now expired checkpoint attempt 1140
from
>>>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>>>> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Received late message for now expired checkpoint attempt 1141
from
>>>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>>>> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>>>>>>>
>>>>>>> For chk #1245 and #1246, there was no late message from TM. You
can
>>>>>>> refer to the TM log. The full completed checkpoint attempt will
have 12
>>>>>>> (... asynchronous part) logs in general, but #1245 and #1246
only got 10
>>>>>>> logs.
>>>>>>>
>>>>>>> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Triggering checkpoint 1245 @ 1506420508690
>>>>>>> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Triggering checkpoint 1246 @ 1506420808690
>>>>>>> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Checkpoint 1245 expired before completing.
>>>>>>> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Checkpoint 1246 expired before completing.
>>>>>>>
>>>>>>> Moreover, I listed the directory for checkpoints on S3 and saw
there
>>>>>>> were two states not discarded successfully. In general, there
will be 16
>>>>>>> parts for a completed checkpoint state.
>>>>>>>
>>>>>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/
>>>>>>> 7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-
>>>>>>> 45a5-bf0b-11cc1fc67ab8
>>>>>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/
>>>>>>> 7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-
>>>>>>> 465d-8509-5fea4ed25af6
>>>>>>>
>>>>>>> Hope these informations are helpful. Thank you.
>>>>>>>
>>>>>>> Best Regards,
>>>>>>> Tony Wei
>>>>>>>
>>>>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <
>>>>>>> s.richter@data-artisans.com>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> thanks for the information. Unfortunately, I have no immediate
idea
>>>>>>>> what the reason is from the given information. I think most
helpful could
>>>>>>>> be a thread dump, but also metrics on the operator operator
level to figure
>>>>>>>> out which part of the pipeline is the culprit.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Stefan
>>>>>>>>
>>>>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920430@gmail.com>:
>>>>>>>>
>>>>>>>> Hi Stefan,
>>>>>>>>
>>>>>>>> There is no unknown exception in my full log. The Flink version
is
>>>>>>>> 1.3.2.
>>>>>>>> My job is roughly like this.
>>>>>>>>
>>>>>>>> env.addSource(Kafka)
>>>>>>>>   .map(ParseKeyFromRecord)
>>>>>>>>   .keyBy()
>>>>>>>>   .process(CountAndTimeoutWindow)
>>>>>>>>   .asyncIO(UploadToS3)
>>>>>>>>   .addSink(UpdateDatabase)
>>>>>>>>
>>>>>>>> It seemed all tasks stopped like the picture I sent in the
last
>>>>>>>> email.
>>>>>>>>
>>>>>>>> I will keep my eye on taking a thread dump from that JVM
if this
>>>>>>>> happens again.
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> Tony Wei
>>>>>>>>
>>>>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <
>>>>>>>> s.richter@data-artisans.com>:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> that is very strange indeed. I had a look at the logs
and there is
>>>>>>>>> no error or exception reported. I assume there is also
no exception in your
>>>>>>>>> full logs? Which version of flink are you using and what
operators were
>>>>>>>>> running in the task that stopped? If this happens again,
would it be
>>>>>>>>> possible to take a thread dump from that JVM?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Stefan
>>>>>>>>>
>>>>>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920430@gmail.com
>>>>>>>>> >:
>>>>>>>>> >
>>>>>>>>> > Hi,
>>>>>>>>> >
>>>>>>>>> > Something weird happened on my streaming job.
>>>>>>>>> >
>>>>>>>>> > I found my streaming job seems to be blocked for
a long time and
>>>>>>>>> I saw the situation like the picture below. (chk #1245
and #1246 were all
>>>>>>>>> finishing 7/8 tasks then marked timeout by JM. Other
checkpoints failed
>>>>>>>>> with the same state like #1247 util I restarted TM.)
>>>>>>>>> >
>>>>>>>>> > <snapshot.png>
>>>>>>>>> >
>>>>>>>>> > I'm not sure what happened, but the consumer stopped
fetching
>>>>>>>>> records, buffer usage is 100% and the following task
did not seem to fetch
>>>>>>>>> data anymore. Just like the whole TM was stopped.
>>>>>>>>> >
>>>>>>>>> > However, after I restarted TM and force the job
restarting from
>>>>>>>>> the latest completed checkpoint, everything worked again.
And I don't know
>>>>>>>>> how to reproduce it.
>>>>>>>>> >
>>>>>>>>> > The attachment is my TM log. Because there are many
user logs
>>>>>>>>> and sensitive information, I only remain the log from
`org.apache.flink...`.
>>>>>>>>> >
>>>>>>>>> > My cluster setting is one JM and one TM with 4 available
slots.
>>>>>>>>> >
>>>>>>>>> > Streaming job uses all slots, checkpoint interval
is 5 mins and
>>>>>>>>> max concurrent number is 3.
>>>>>>>>> >
>>>>>>>>> > Please let me know if it needs more information
to find out what
>>>>>>>>> happened on my streaming job. Thanks for your help.
>>>>>>>>> >
>>>>>>>>> > Best Regards,
>>>>>>>>> > Tony Wei
>>>>>>>>> > <flink-root-taskmanager-0-partial.log>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>> <chk_ 1577.log>
>>
>>
>>

Mime
View raw message