From user-return-31751-archive-asf-public=cust-asf.ponee.io@flink.apache.org Sun Jan 5 02:28:37 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 472C418065E for ; Sun, 5 Jan 2020 03:28:37 +0100 (CET) Received: (qmail 96325 invoked by uid 500); 5 Jan 2020 02:28:35 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 96315 invoked by uid 99); 5 Jan 2020 02:28:34 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Jan 2020 02:28:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 8BC00C01CF for ; Sun, 5 Jan 2020 02:28:33 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0 X-Spam-Level: X-Spam-Status: No, score=0 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id sL4Xze1QoIff for ; Sun, 5 Jan 2020 02:28:30 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.167.50; helo=mail-lf1-f50.google.com; envelope-from=java.dev.mtl@gmail.com; receiver= Received: from mail-lf1-f50.google.com (mail-lf1-f50.google.com [209.85.167.50]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id C429ABC7EB for ; Sun, 5 Jan 2020 02:28:29 +0000 (UTC) Received: by mail-lf1-f50.google.com with SMTP id y19so34208034lfl.9 for ; Sat, 04 Jan 2020 18:28:29 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=9yGZlzo9rK2j2rgb88bKStgTcNpKFGgc+kGiexout6k=; b=B+DXSc0XvEhWJoDdM1lVRABIrJWAeknhr3IkYNFhGMfFnpw4lg67NxmsPVfEhtrc4k gjggRcT0GHEY7XGSNegjYO9jru2ptVx7YPtS7ogdB4yzOS/MyKUiCHI5bdKAXr9AcRa5 neUdvD7WJ++ITDXdcsCgaux+QYUY7oVpRXxasnY2e6eT6YTq8MXCh3AOxROCfR/bEw4t nGj/BOIb89SYdIavHxWLWphu9uzCLLmcdnBkVZTcwj2/1pFI1x5cXPnvLHhPexFKh2A+ bqdVULta+EPTh94YlGlNAaREh9mwQ2nqaK+Rrf0/u+nA4G5NOQ6blaZjE1PItxrZXz35 /MuQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=9yGZlzo9rK2j2rgb88bKStgTcNpKFGgc+kGiexout6k=; b=XxMT9AVu91ML6Ohs9lzVSoVJEIx2f9pr9BkyQR+W2gO9+21GYJoYPDrwuS7Z7RS6jh 7zRBxkhvjYq3Ug94Vk84DxKGew3ofqY0jWIBkEGYDsaDAuyApDy+KZ6120rKwZz1gKh7 +hB7U9JGoMqAyeBAjVeLtLktXbUi48F6d6hxYLpcKodBhmlxNySkWshxJbgkjcAiw6nH 4SPiULokyoQQwnG+VgfgnplfASjW7gyQmAGjlqSMs03dEwJifeQfSFxsvQPC5ei7IuH6 QkIrNyq4NYuYnN07nS6Esd0oQsb7I+Fozcj4H1EwWSrqMEoZIvhY4SLKim/NocRJrHzg cN3Q== X-Gm-Message-State: APjAAAXGuIJDTPA0nI9CkvjeiQxYMLwibo+kn+Huwv+q0zEUPwlRG78K 91lYaF2t9x41sx/KIXyPhkCNgkvqTjgAXIzjK0A= X-Google-Smtp-Source: APXvYqwliVTnGhWyd4WJ9YEW5qzYr8UJAzUK6CqVCEhX4Zv8T0X8c7D0mGMZbS+YjZ9GwPQeSKeBET9yFfBYO3iyiGg= X-Received: by 2002:a05:6512:284:: with SMTP id j4mr51355155lfp.109.1578191308518; Sat, 04 Jan 2020 18:28:28 -0800 (PST) MIME-Version: 1.0 References: <99eaf2a6-41b4-4a82-adff-d943bfbb778a.wangzhijiang999@aliyun.com> <2746c51d-a1ff-bb05-31e5-ccad0d5eb801@apache.org> In-Reply-To: From: John Smith Date: Sat, 4 Jan 2020 21:27:52 -0500 Message-ID: Subject: Re: Flink task node shut it self off. To: Chesnay Schepler Cc: Zhijiang , user Content-Type: multipart/alternative; boundary="00000000000054fbc5059b5b4c9c" --00000000000054fbc5059b5b4c9c Content-Type: text/plain; charset="UTF-8" It seems to have happened again... Here is a screen shot of the system metrics for that day on that particular node.... https://www.dropbox.com/s/iudn7z2fvvy7vb8/flink-node.png?dl=0 On Fri, 3 Jan 2020 at 12:19, John Smith wrote: > Well there was this huge IO wait like over 140% spike. IO wait rose slowly > for couple hours then at some time it spiked at 140% and then after IO wait > dropped back to "normal" the CPU 1min 5min 15min spiked to like 3 times the > number of cores for a bit. > > We where at "peek" operation. I.e we where running a batch job when this > hapenned. On average operation the "business" requests per second from our > services is about 15 RPS when we do batches we can hit 600 RPS for a few > hours and then back down. Each business request underneath does a few round > trips back and forth between Kafka, cache systems Flink, DBs etc... So > Flink jobs are a subset of some parts of that 600 RPS. > > On Flink side we 3 task managers of 4 cores 8GB which are configured as 8 > slots, 5.4GB JVM, 3.77GB flink managed mem per task manager. We have 8 jobs > and 9 slots free. So the cluster isn't full yet. But we do see one node is > full. > > We use disk FS state (backed by GlusterFS) not rocks DB. We had enabled 5 > second checkpointing for 6 of the jobs... So just wondering if that was > possibly the reason for the IO wait... But regardless of the RPS mentioned > above the jobs will always checkpoint every 5 seconds... I had the chance > to increase checkpointing for a few of the jobs before the holidays. I am > back on Monday... > > On Fri., Jan. 3, 2020, 11:16 a.m. Chesnay Schepler, > wrote: > >> The logs show 2 interesting pieces of information: >> >> >> ... >> 2019-12-19 18:33:23,278 INFO >> org.apache.kafka.clients.FetchSessionHandler - [Consumer >> clientId=consumer-4, groupId=ccccccdb-prod-import] Error sending fetch >> request (sessionId=INVALID, epoch=INITIAL) to node 0: >> org.apache.kafka.common.errors.DisconnectException. >> ... >> 2019-12-19 19:37:06,732 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not >> resolve ResourceManager address akka.tcp://flink@xxxxxx-job-0002:36835/user/resourcemanager, >> retrying in 10000 ms: Ask timed out on >> [ActorSelection[Anchor(akka.tcp://flink@xxxxxx-job-0002:36835/), >> Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message >> of type "akka.actor.Identify".. >> >> This reads like the machine lost network connectivity for some reason. >> The tasks start failing because kafka cannot be reached, and the TM then >> shuts down because it can neither reach the ResourceManager. >> >> On 25/12/2019 04:34, Zhijiang wrote: >> >> If you use rocksDB state backend, it might consume extra native memory. >> Some resource framework cluster like yarn would kill the container if the >> memory usage exceeds some threshold. You can also double check whether it >> exists in your case. >> >> ------------------------------------------------------------------ >> From:John Smith >> Send Time:2019 Dec. 25 (Wed.) 03:40 >> To:Zhijiang >> Cc:user >> Subject:Re: Flink task node shut it self off. >> >> The shutdown happened after the massive IO wait. I don't use any state >> Checkpoints are disk based... >> >> On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang, >> wrote: >> Hi John, >> >> Thanks for the positive comments of Flink usage. No matter at least-once >> or exactly-once you used for checkpoint, it would never lose one message >> during failure recovery. >> >> Unfortunatelly I can not visit the logs you posted. Generally speaking the >> longer internal checkpoint would mean replaying more source data after >> failure recovery. >> In my experience the 5 seconds interval for checkpoint is too frequently >> in my experience, and you might increase it to 1 minute or so. You can also >> monitor how long will the checkpoint finish in your application, then you >> can adjust the interval accordingly. >> >> Concerning of the node shutdown you mentioned, I am not quite sure >> whether it is relevant to your short checkpoint interval. Do you config to >> use heap state backend? The hs_err file really indicated that you job >> had encountered the memory issue, then it is better to somehow increase >> your task manager memory. But if you can analyze the dump hs_err file via >> some profiler tool for checking the memory usage, it might be more helpful >> to find the root cause. >> >> Best, >> Zhijiang >> >> ------------------------------------------------------------------ >> From:John Smith >> Send Time:2019 Dec. 21 (Sat.) 05:26 >> To:user >> Subject:Flink task node shut it self off. >> >> Hi, using Flink 1.8.0 >> >> 1st off I must say Flink resiliency is very impressive, we lost a node >> and never lost one message by using checkpoints and Kafka. Thanks! >> >> The cluster is a self hosted cluster and we use our own zookeeper >> cluster. We have... >> 3 zookeepers: 4 cpu, 8GB (each) >> 3 job nodes: 4 cpu, 8GB (each) >> 3 task nodes: 4 cpu, 8GB (each) >> The nodes also share GlusterFS for storing savepoints and checkpoints, >> GlusterFS is running on the same machines. >> >> Yesterday a node shut itself off we the following log messages... >> - Stopping TaskExecutor >> akka.tcp://flink@xxx.xxx.xxx.73:34697/user/taskmanager_0. >> - Stop job leader service. >> - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. >> - Shutting down TaskExecutorLocalStateStoresManager. >> - Shutting down BLOB cache >> - Shutting down BLOB cache >> - removed file cache directory >> /tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000 >> - I/O manager removed spill file directory >> /tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed >> - Shutting down the network environment and its components. >> >> Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU >> load 1minute of 15. And we also got an hs_err file which sais we should >> increase the memory. >> >> I'm attaching the logs here: >> https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0 >> >> I wonder if my 5 second checkpointing is too much for gluster. >> >> Any thoughts? >> >> >> >> >> >> >> >> --00000000000054fbc5059b5b4c9c Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
It seems to have happened again... Here is a screen shot o= f the system metrics for that day on that particular node....

https= ://www.dropbox.com/s/iudn7z2fvvy7vb8/flink-node.png?dl=3D0


On Fr= i, 3 Jan 2020 at 12:19, John Smith <java.dev.mtl@gmail.com> wrote:
Well there was this huge IO w= ait like over 140% spike. IO wait rose slowly for couple hours then at some= time it spiked at 140% and then after IO wait dropped back to "normal= " the CPU 1min 5min 15min spiked to like 3 times the number of cores f= or a bit.

We where at "pe= ek" operation. I.e we where running a batch job when this hapenned. On= average operation the "business" requests per second from our se= rvices is about 15 RPS when we do batches we can hit 600 RPS for a few hour= s and then back down. Each business request underneath does a few round tri= ps back and forth between Kafka, cache systems Flink, DBs etc... So Flink j= obs are a subset of some parts of that 600 RPS.

=
On Flink side we 3 task managers of 4 cores 8GB whi= ch are configured as 8 slots, 5.4GB JVM, 3.77GB flink managed mem per task = manager. We have 8 jobs and 9 slots free. So the cluster isn't full yet= . But we do see one node is full.

We use disk FS state (backed by GlusterFS) not rocks DB. We had e= nabled 5 second checkpointing for 6 of the jobs... So just wondering if tha= t was possibly the reason for the IO wait... But regardless of the RPS ment= ioned above the jobs will always checkpoint every 5 seconds... I had the ch= ance to increase checkpointing for a few of the jobs before the holidays. I= am back on Monday...

On Fri., Jan. 3, 2020, 11:16 a.m. Chesnay Schepl= er, <chesnay@apa= che.org> wrote:
=20 =20 =20
The logs show 2 interesting pieces of information:

<tasks are submitted>
...
2019-12-19 18:33:23,278 INFO=C2=A0 org.apache.kafka.clients.FetchSessionHandler=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0 - [Consumer clientId=3Dconsumer-4, groupId=3Dccccccdb-prod-import] Erro= r sending fetch request (sessionId=3DINVALID, epoch=3DINITIAL) to node 0: org.apache.kafka.common.errors.DisconnectException.
...
2019-12-19 19:37:06,732 INFO=C2=A0 org.apache.flink.runtime.taskexecutor.TaskExecutor=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 - Could not resolve ResourceManager address akka.tcp://flink@xxxxxx-job-0002:36835/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@xxxxxx-job-0002:36835/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..

This reads like the machine lost network connectivity for some reason. The tasks start failing because kafka cannot be reached, and the TM then shuts down because it can neither reach the ResourceManager.

On 25/12/2019 04:34, Zhijiang wrote:
=20
If you use rocksDB=C2=A0state backend, = it might consume extra native memory.=C2=A0
Some resource framework cluster like yarn would kill the container if the memory usage exceeds some threshold.=C2=A0You can also double check whether it exist= s in your case.

-------------------------------------= -----------------------------
From:John Smith <java.dev.mtl@gmail.com>
Send Time:2019 Dec. 25 (Wed.) 03:40
Subject:Re: Flink task node shut it self off.

The shutdown happened after the massive IO wait. I don't use any state Checkpoints are disk based...

On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang, <wangzhijiang999@aliyun.com> wrote:
Hi John,

Thanks for the positive comments of Flink usage. No matter at=C2=A0least-once or exactly-once you used for checkpoint, it would never lose one message during failure recovery.

Unfortunatelly I can not visit the logs you posted. Generally speaking the longer internal checkpoint would mean=C2=A0replaying more source data after failure recovery.
In my experience=C2=A0the 5 seconds interval for checkpoint is too frequently in my experience, and you might increase it to 1 minute or so. You can also monitor how long will the checkpoint finish in your application, then you can adjust the interval=C2=A0accordingly.

Concerning of the node shutdown you mentioned,=C2=A0I am not quite sure whether it is relevant to your short checkpoint interval. Do you config to use heap state backend?=C2= =A0 The=C2=A0hs_err file really indicated that you job had encountered the memory issue, then it is better to somehow increase your task manager memory. But if you can analyze the dump hs_err file via some profiler tool for checking the memory usage, it might be more helpful to find the root cause.

Best,
Zhijiang=C2=A0

---------------------------------= ---------------------------------
From:John Smith <java.dev.= mtl@gmail.com>
Send Time:2019 Dec. 21 (Sat.) 05:26
Subject:Flink task node shut it self off.

Hi, using Flink 1.8.0

1st off I must say Flink resiliency is very impressive, we lost a node and never lost one message by using checkpoints and Kafka. Thanks!

The cluster is a self hosted cluster and we use our own zookeeper cluster. We have...
3 zookeepers: 4 cpu, 8GB (each)
3 job nodes: 4 cpu, 8GB (each)
3 task nodes: 4 cpu, 8GB (each)
The nodes also share GlusterFS for storing savepoints and checkpoints, GlusterFS is running on the same machines.

Yesterday a node shut itself off we the following log messages...
- Stopping TaskExecutor akka.tcp://flink@= xxx.xxx.xxx.73:34697/user/taskmanager_0.
- Stop job leader service.
- Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
- Shutting down TaskExecutorLocalStateStoresManager. - Shutting down BLOB cache
- Shutting down BLOB cache
- removed file cache directory /tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205= 000
- I/O manager removed spill file directory /tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
- Shutting down the network environment and its components.

Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU load 1minute of 15. And we also got an hs_err file which sais we should increase the memory.

I'm attaching the logs here:=C2=A0https://www.dropbox.com/sh/vp1ytpguimiayw= 7/AADviCPED47QEy_4rHsGI1Nya?dl=3D0

I wonder if my 5 second checkpointing is too much for gluster.

Any thoughts?







--00000000000054fbc5059b5b4c9c--