flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steven Wu <stevenz...@gmail.com>
Subject Re: Task Manager detached under load
Date Fri, 25 May 2018 14:17:45 GMT
Till, thanks for the follow-up. looking forward to 1.5 :)

On Fri, May 25, 2018 at 2:11 AM, Till Rohrmann <trohrmann@apache.org> wrote:

> Hi Steven,
>
> we don't have `jobmanager.exit-on-fatal-akka-error` because then the JM
> would also be killed if a single TM gets quarantined. This is also not a
> desired behaviour.
>
> With Flink 1.5 the problem with quarantining should be gone since we don't
> rely anymore on Akka's death watch and instead use our own heartbeats.
>
> Cheers,
> Till
>
> On Mon, May 14, 2018 at 1:07 AM, Steven Wu <stevenz3wu@gmail.com> wrote:
>
>> Till,
>>
>> thanks for the clarification. yes, that situation is undesirable either.
>>
>> In our case, restarting jobmanager could also recover the job from akk
>> association lock-out. it was actually the issue (high GC pause) on
>> jobmanager side that caused the akka failure.
>>
>> do we have sth like "jobmanager.exit-on-fatal-akka-error: true"? does it
>> make sense to terminate jobmanager in this case?
>>
>> Thanks,
>> Steven
>>
>> On Sun, May 13, 2018 at 1:12 PM, Till Rohrmann <trohrmann@apache.org>
>> wrote:
>>
>>> Hi Steven,
>>>
>>> the reason why we did not turn on this feature per default was that in
>>> case of a true JM failure, all of the TMs will think that they got
>>> quarantined which triggers their shut down. Depending on how many container
>>> restarts you have left on Yarn, for example, this can lead to a situation
>>> where Flink is not able to recover the job even though it needed to only
>>> restart the JM container.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 25, 2018 at 10:39 PM, Steven Wu <stevenz3wu@gmail.com>
>>> wrote:
>>>
>>>> Till,
>>>>
>>>> We ran into the same issue. It started with high GC pause that caused
>>>> jobmanager to lose zk conn and leadership and caused jobmanager to
>>>> quarantine taskmanager in akka. Once quarantined, akka association btw
>>>> jobmanager and taskmanager is locked forever.
>>>>
>>>> Your suggestion of " taskmanager.exit-on-fatal-akka-error: true"
>>>> worked. taskmanager exited and replacement taskmanager joined the cluster
>>>> afterwards. I am wondering why is this not defaulted to "true". Any
>>>> downside?
>>>>
>>>> Thanks,
>>>> Steven
>>>>
>>>> On Sat, Feb 24, 2018 at 7:02 AM, ashish pok <ashishpok@yahoo.com>
>>>> wrote:
>>>>
>>>>> @Jelmer, this is Till's las response on the issue.
>>>>>
>>>>> -- Ashish
>>>>>
>>>>> On Mon, Feb 5, 2018 at 5:56 AM, Till Rohrmann
>>>>> <trohrmann@apache.org> wrote:
>>>>> Hi,
>>>>>
>>>>> this sounds like a serious regression wrt Flink 1.3.2 and we should
>>>>> definitely find out what's causing this problem. Given from what I see
in
>>>>> the logs, the following happens:
>>>>>
>>>>> For some time the JobManager seems to no longer receive heartbeats
>>>>> from the TaskManager. This could be, for example, due to long GC pauses
or
>>>>> heavy load which starves the ActorSystem's threads which are responsible
>>>>> for sending the heartbeats. Due to this, the TM's ActorSystem is
>>>>> quarantined which effectively renders them useless because the JM will
>>>>> henceforth ignore all messages from these systems. The only way to resolve
>>>>> this problem is to restart the ActorSystem. By
>>>>> setting taskmanager.exit-on-fatal-akka-error to true in
>>>>> flink-conf.yaml, a quarantined TM will shut down. If you run the Flink
>>>>> cluster on Yarn, then a new substitute TM will be started if you have
still
>>>>> some container restarts left. That way, the system should be able to
>>>>> recover.
>>>>>
>>>>> Additionally you could try to play around
>>>>> with akka.watch.heartbeat.interval and akka.watch.heartbeat.pause
>>>>> which control the heartbeat interval and the acceptable pause. By
>>>>> increasing the latter, the system should tolerate longer GC pauses and
>>>>> period of high load.
>>>>>
>>>>> However, this only addresses the symptoms of the problem and I'd like
>>>>> to find out what's causing the problem. In order to further debug the
>>>>> problem, it would be really helpful to obtain the logs of the JobManager
>>>>> and the TaskManagers on DEBUG log level and with taskmanager.debug.memory.startLogThread
>>>>> set to true. Additionally it would be interesting to see whats happening
on
>>>>> the TaskManagers when you observe high load. So obtaining a profiler
dump
>>>>> via VisualVM would be great. And last but not least, it also helps to
learn
>>>>> more about the job you're running. What kind of connectors is it using?
Are
>>>>> you using Flink's metric system? How is the Flink cluster deployed? Which
>>>>> other libraries are you using in your job?
>>>>>
>>>>> Thanks a lot for your help!
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick <cresny@gmail.com>
>>>>> wrote:
>>>>>
>>>>> I've seen a similar issue while running successive Flink SQL batches
>>>>> on 1.4. In my case, the Job Manager would fail with the log output about
>>>>> unreachability (with an additional statement about something going
>>>>> "horribly wrong"). Under workload pressure, I reverted to 1.3.2 where
>>>>> everything works perfectly, but we will try again soon on 1.4. When we
do I
>>>>> will post the actual log output.
>>>>>
>>>>> This was on YARN in AWS, with akka.ask.timeout = 60s.
>>>>>
>>>>> On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel <ashishpok@yahoo.com>
>>>>> wrote:
>>>>>
>>>>> I haven’t gotten much further with this. It doesn’t look like GC
>>>>> related - at least GC counters were not that atrocious. However, my main
>>>>> concern was once the load subsides why aren’t TM and JM connecting
again?
>>>>> That doesn’t look normal. I could definitely tell JM was listening
on the
>>>>> port and from logs it does appear TM is trying to message JM that is
still
>>>>> alive.
>>>>>
>>>>> Thanks, Ashish
>>>>>
>>>>> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard <
>>>>> lassenedergaard@gmail.com> wrote:
>>>>>
>>>>> Hi.
>>>>>
>>>>> Did you find a reason for the detaching ?
>>>>> I sometimes see the same on our system running Flink 1.4 on dc/os. I
>>>>> have enabled taskmanager.Debug.memory.start logthread for debugging.
>>>>>
>>>>> Med venlig hilsen / Best regards
>>>>> Lasse Nedergaard
>>>>>
>>>>>
>>>>> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong <duckientruong@gmail.com
>>>>> >:
>>>>>
>>>>> Hi,
>>>>>
>>>>> You should enable and check your garbage collection log.
>>>>>
>>>>> We've encountered case where Task Manager disassociated due to long GC
>>>>> pause.
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>> Kien
>>>>> On 1/20/2018 1:27 AM, ashish pok wrote:
>>>>>
>>>>> Hi All,
>>>>>
>>>>> We have hit some load related issues and was wondering if any one has
>>>>> some suggestions. We are noticing task managers and job managers being
>>>>> detached from each other under load and never really sync up again. As
a
>>>>> result, Flink session shows 0 slots available for processing. Even though,
>>>>> apps are configured to restart it isn't really helping as there are no
>>>>> slots available to run the apps.
>>>>>
>>>>>
>>>>> Here are excerpt from logs that seemed relevant. (I am trimming out
>>>>> rest of the logs for brevity)
>>>>>
>>>>> *Job Manager:*
>>>>> 2018-01-19 12:38:00,423 INFO  org.apache.flink.runtime.jobma
>>>>> nager.JobManager                -  Starting JobManager (Version: 1.4.0,
>>>>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>>>>
>>>>> 2018-01-19 12:38:00,792 INFO  org.apache.flink.runtime.jobma
>>>>> nager.JobManager                -  Maximum heap size: 16384 MiBytes
>>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>>>> nager.JobManager                -  Hadoop version: 2.6.5
>>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>>>> nager.JobManager                -  JVM Options:
>>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>>>> nager.JobManager                -     -Xms16384m
>>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>>>> nager.JobManager                -     -Xmx16384m
>>>>> 2018-01-19 12:38:00,795 INFO  org.apache.flink.runtime.jobma
>>>>> nager.JobManager                -     -XX:+UseG1GC
>>>>>
>>>>> 2018-01-19 12:38:00,908 INFO  org.apache.flink.configuration
>>>>> .GlobalConfiguration            - Loading configuration property:
>>>>> jobmanager.rpc.port, 6123
>>>>> 2018-01-19 12:38:00,908 INFO  org.apache.flink.configuration
>>>>> .GlobalConfiguration            - Loading configuration property:
>>>>> jobmanager.heap.mb, 16384
>>>>>
>>>>>
>>>>> 2018-01-19 12:53:34,671 WARN  akka.remote.RemoteWatcher
>>>>>                      - Detected unreachable: [akka.tcp://flink@<jm-host>:37
>>>>> 840]
>>>>> 2018-01-19 12:53:34,676 INFO  org.apache.flink.runtime.jobma
>>>>> nager.JobManager                - Task manager akka.tcp://flink@<jm-host>:378
>>>>> 40/user/taskmanager terminated.
>>>>>
>>>>> -- So once Flink session boots up, we are hitting it with pretty heavy
>>>>> load, which typically results in the WARN above
>>>>>
>>>>> *Task Manager:*
>>>>> 2018-01-19 12:38:01,002 INFO  org.apache.flink.runtime.taskm
>>>>> anager.TaskManager              -  Starting TaskManager (Version: 1.4.0,
>>>>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>> anager.TaskManager              -  Hadoop version: 2.6.5
>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>> anager.TaskManager              -  JVM Options:
>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>> anager.TaskManager              -     -Xms16384M
>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>> anager.TaskManager              -     -Xmx16384M
>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>> anager.TaskManager              -     -XX:MaxDirectMemorySize=83886 07T
>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>> anager.TaskManager              -     -XX:+UseG1GC
>>>>>
>>>>> 2018-01-19 12:38:01,392 INFO  org.apache.flink.configuration
>>>>> .GlobalConfiguration            - Loading configuration property:
>>>>> jobmanager.rpc.port, 6123
>>>>> 2018-01-19 12:38:01,392 INFO  org.apache.flink.configuration
>>>>> .GlobalConfiguration            - Loading configuration property:
>>>>> jobmanager.heap.mb, 16384
>>>>>
>>>>>
>>>>> 2018-01-19 12:54:48,626 WARN  akka.remote.RemoteWatcher
>>>>>                      - Detected unreachable: [akka.tcp://flink@<jm-host>:61
>>>>> 23]
>>>>> 2018-01-19 12:54:48,690 INFO  akka.remote.Remoting
>>>>>                       - Quarantined address [akka.tcp://flink@<jm-host>:61
>>>>> 23] is still unreachable or has not been restarted. Keeping it
>>>>> quarantined.
>>>>> 018-01-19 12:54:48,774 WARN  akka.remote.Remoting
>>>>>                     - Tried to associate with unreachable remote address
[akka.tcp://flink@<tm-host>:61
>>>>> 23]. Address is now gated for 5000 ms, all messages to this address
>>>>> will be delivered to dead letters. Reason: [The remote system has a UID
>>>>> that has been quarantined. Association aborted.]
>>>>> 2018-01-19 12:54:48,833 WARN  akka.remote.Remoting
>>>>>                       - Tried to associate with unreachable remote address
[akka.tcp://flink@<tm-host>:61
>>>>> 23]. Address is now gated for 5000 ms, all messages to this address
>>>>> will be delivered to dead letters. Reason: [The remote system has
>>>>> quarantined this system. No further associations to the remote system
are
>>>>> possible until this system is restarted.]
>>>>> <bunch of ERRORs on operations not shutdown properly - assuming
>>>>> because JM is unreachable>
>>>>>
>>>>> 2018-01-19 12:56:51,244 INFO  org.apache.flink.runtime.taskm
>>>>> anager.TaskManager              - Trying to register at JobManager akka.tcp://flink@<jm-host>:612
>>>>> 3/user/jobmanager (attempt 10, timeout: 30000 milliseconds)
>>>>> 2018-01-19 12:56:51,253 WARN  akka.remote.Remoting
>>>>>                       - Tried to associate with unreachable remote address
[akka.tcp://flink@<jm-host>:61
>>>>> 23]. Address is now gated for 5000 ms, all messages to this address
>>>>> will be delivered to dead letters. Reason: [The remote system has
>>>>> quarantined this system. No further associations to the remote system
are
>>>>> possible until this system is restarted.]
>>>>>
>>>>> So bottom line is, JM and TM couldn't communicate under load, which is
>>>>> obviously not good. I tried to bump up akka.tcp.timeout as well but it
>>>>> didnt help either. So my question here is after all processing is halted
>>>>> and there is no new data being picked up, shouldn't this environment
>>>>> self-heal? Any other things I can be looking at other than extending
>>>>> timeouts?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Ashish
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message