Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9D7E7200C6A for ; Wed, 19 Apr 2017 15:25:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9C168160B94; Wed, 19 Apr 2017 13:25:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BB26A160B86 for ; Wed, 19 Apr 2017 15:25:15 +0200 (CEST) Received: (qmail 48441 invoked by uid 500); 19 Apr 2017 13:25:13 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 48432 invoked by uid 99); 19 Apr 2017 13:25:13 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Apr 2017 13:25:13 +0000 Received: from mail-it0-f47.google.com (mail-it0-f47.google.com [209.85.214.47]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 1F31B1A0829 for ; Wed, 19 Apr 2017 13:25:12 +0000 (UTC) Received: by mail-it0-f47.google.com with SMTP id b15so30725849iti.1 for ; Wed, 19 Apr 2017 06:25:12 -0700 (PDT) X-Gm-Message-State: AN3rC/75OUUNw/HGCRtrguG91yzJfg2jivyWXVccH+500t3LNnPSvUMw XKx1dQgYkxWdhdNYYT09l0FPAudWSg== X-Received: by 10.84.218.72 with SMTP id f8mr4094062plm.146.1492608312362; Wed, 19 Apr 2017 06:25:12 -0700 (PDT) MIME-Version: 1.0 Received: by 10.100.145.144 with HTTP; Wed, 19 Apr 2017 06:25:11 -0700 (PDT) In-Reply-To: <9DF8AFBB-0919-4538-BF0A-F3A5F2867720@expedia.com> References: <9DF8AFBB-0919-4538-BF0A-F3A5F2867720@expedia.com> From: Stephan Ewen Date: Wed, 19 Apr 2017 15:25:11 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Yarn terminating TM for pmem limit cascades causing all jobs to fail To: Shannon Carey Cc: "user@flink.apache.org" Content-Type: multipart/alternative; boundary=f403045d15b63f4f88054d84f452 archived-at: Wed, 19 Apr 2017 13:25:16 -0000 --f403045d15b63f4f88054d84f452 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Shannon! Increasing the number of retries is definitely a good idea. The fact that you see increasing pmem use after failures / retries - let's dig into that. There are various possible leaks depending on what you use: (1) There may be a leak in class-loading (or specifically class unloading). 1.1.x dynamically loads code when tasks are (re)started. This requires that code can be unloaded, which means that tasks (after being cancelled) must have no more references to the classes. Class leaks typically come when you spawn threads (or use libraries that spawn threads) but do not shut them down when tasks are cancelled. You can check this in the Flink UI by looking at the non-heap memory consumption of the TaskManagers. In case of that type of leak, that number should continuously grow. 1.2.x does not re-load code on each task restart in the Yarn per-job mode. (2) There may be a leak in the native memory allocation of some library you use, such as Netty or so. (3) As for a RocksDB leak - I am not directly aware of a known leak in 1.1.x, but the RocksDB code has been improved quite a bit from 1.1.x to 1.2.x. It may be worth checking out 1.2.x to see if that fixes the issue. The "Association with remote system =E2=80=A6 has failed, address is now ga= ted for [5000] ms. Reason is: [Disassociated]." is what akka logs if a remote system is lost - hence a normal artifact of taskmanager failures. Greetings, Stephan On Wed, Apr 19, 2017 at 12:26 AM, Shannon Carey wrote: > I'm on Flink 1.1.4. We had yet another occurrence of Yarn killing a TM du= e > to exceeding pmem limits and all jobs failing as a result. I thought I ha= d > successfully disabled that check, but apparently the property doesn't wor= k > as expected in EMR. > > From what I can tell in the logs, it looks like after the first TM was > killed by Yarn, the jobs failed and were retried. However, when they are > retried they cause increased pmem load on yet another TM, which results i= n > Yarn killing another TM. That caused the jobs to fail again. This happene= d > 5 times until our job retry policy gave up and allowed the jobs to fail > permanently. Obviously, this situation is very problematic because it > results in the loss of all job state, plus it requires manual interventio= n > to start the jobs again. > > The job retries eventually fail due to, "Could not restart the job ... Th= e > slot in which the task was executed has been released. Probably loss of > TaskManager" or due to "Could not restart the job =E2=80=A6 Connection un= expectedly > closed by remote task manager =E2=80=A6 This might indicate that the remo= te task > manager was lost." Those are only the final failure causes: Flink does no= t > appear to log the cause of intermediate restart failures. > > I assume that the messages logged from the JobManager about "Association > with remote system =E2=80=A6 has failed, address is now gated for [5000] = ms. Reason > is: [Disassociated]." is due to the TM failing, and is expected/harmless? > > It seems like disabling the pmem check will fix this problem, but I am > wondering if this is related: https://flink.apache.org/faq.html#the-slot- > allocated-for-my-task-manager-has-been-released-what-should-i-do ? I > don't see any log messages about quarantined TMs=E2=80=A6 > > Do you think that increasing the # of job retries so that the jobs don't > fail until all TMs are replaced with fresh ones fix this issue? The > "memory.percent-free" metric from Collectd did go down to 2-3% on the TMs > before they failed, and shot back up to 30-40% on TM restart (though I'm > not sure how much of that had to do with the loss of state). So, memory > usage may be a real problem, but we don't get an OOM exception so I'm not > sure we can control this from the JVM perspective. Are there other memory > adjustments we should make which would allow our TMs to run for long > periods of time without having this problem? Is there perhaps a memory le= ak > in RocksDB? > > Thanks for any help you can provide, > Shannon > --f403045d15b63f4f88054d84f452 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Shannon!

Increasing the number of re= tries is definitely a good idea.

The fact that you= see increasing pmem use after failures / retries - let's dig into that= . There are various possible leaks depending on what you use:
=C2=A0 (1) There may be a leak in class-loading (or specificall= y class unloading). 1.1.x dynamically loads code when tasks are (re)started= . This requires that code can be unloaded, which means that tasks (after be= ing cancelled) must have no more references to the classes. Class leaks typ= ically come when you spawn threads (or use libraries that spawn threads) bu= t do not shut them down when tasks are cancelled.

= =C2=A0 =C2=A0 You can check this in the Flink UI by looking at the non-heap= memory consumption of the TaskManagers. In case of that type of leak, that= number should continuously grow.

=C2=A0 =C2=A0 1.= 2.x does not re-load code on each task restart in the Yarn per-job mode.


=C2=A0 (2) There may be a leak in the= native memory allocation of some library you use, such as Netty or so.


=C2=A0 (3) As for a RocksDB leak - I a= m not directly aware of a known leak in 1.1.x, but the RocksDB code has bee= n improved quite a bit from 1.1.x to 1.2.x. It may be worth checking out 1.= 2.x to see if that fixes the issue.


The "Association with remote system =E2=80=A6 has failed, address= is now gated for [5000] ms. Reason is: [Disassociated]." is what akka= logs if a remote system is lost - hence a normal artifact of taskmanager f= ailures.

Greetings,
Stephan
=



On Wed, Apr 19, 2017 at 12:26 AM, Shannon Carey <scare= y@expedia.com> wrote:
I'm on Flink 1.1.4. We had yet another occurrence of Yarn killing = a TM due to exceeding pmem limits and all jobs failing as a result. I thoug= ht I had successfully disabled that check, but apparently the property does= n't work as expected in EMR.

From what I can tell in the logs, it looks like after the first TM was= killed by Yarn, the jobs failed and were retried. However, when they are r= etried they cause increased pmem load on yet another TM, which results in Y= arn killing another TM. That caused the jobs to fail again. This happened 5 times until our job retry policy g= ave up and allowed the jobs to fail permanently. Obviously, this situation = is very problematic because it results in the loss of all job state, plus i= t requires manual intervention to start the jobs again.

The job retries eventually fail due to, "Could not restart the jo= b ... The slot in which the task was executed has been released. Probably l= oss of TaskManager" or due to "Could not restart the job =E2=80= =A6=C2=A0Connection unexpectedly closed by remote task manager =E2=80=A6=C2=A0This might indicate that the remote task manager was lost.&= quot; Those are only the final failure causes: Flink does not appear to log= the cause of intermediate restart failures.

I assume that the messages logged from the JobManager about "Asso= ciation with remote system =E2=80=A6 has failed, address is now gated for [= 5000] ms. Reason is: [Disassociated]." is due to the TM failing, and i= s expected/harmless?

It seems like disabling the pmem check will fix this problem, but I am= wondering if this is related:=C2=A0https://flink.apache.org/faq.html#the-slot-allocated-for-my-task-manager-has-been-released-what-should-i-d= o=C2=A0?=C2=A0I don't see any log messages about quarantined TMs=E2=80=A6

Do you think that increasing the # of job retries so that the jobs don= 't fail until all TMs are replaced with fresh ones fix this issue? The = "memory.percent-free" metric from Collectd did go down to 2-3% on= the TMs before they failed, and shot back up to 30-40% on TM restart (though I'm not sure how much of that had to d= o with the loss of state).=C2=A0 So, memory usage may be a real problem, bu= t we don't get an OOM exception so I'm not sure we can control this= from the JVM perspective. Are there other memory adjustments we should make which would allow our TMs to run for long perio= ds of time without having this problem? Is there perhaps a memory leak in R= ocksDB?

Thanks for any help you can provide,
Shannon

--f403045d15b63f4f88054d84f452--