From user-return-35731-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Jun 12 08:49:55 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id CFEE91804BB for ; Fri, 12 Jun 2020 10:49:54 +0200 (CEST) Received: (qmail 8595 invoked by uid 500); 12 Jun 2020 08:49:52 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 8581 invoked by uid 99); 12 Jun 2020 08:49:52 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Jun 2020 08:49:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D7D141A4343 for ; Fri, 12 Jun 2020 08:49:51 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.249 X-Spam-Level: X-Spam-Status: No, score=0.249 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id PA13EBdcEG6r for ; Fri, 12 Jun 2020 08:49:49 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.166.173; helo=mail-il1-f173.google.com; envelope-from=tonysong820@gmail.com; receiver= Received: from mail-il1-f173.google.com (mail-il1-f173.google.com [209.85.166.173]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 959C9BB906 for ; Fri, 12 Jun 2020 08:49:49 +0000 (UTC) Received: by mail-il1-f173.google.com with SMTP id z2so8112866ilq.0 for ; Fri, 12 Jun 2020 01:49:49 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=j5qbYpSZQIQB4p7/tTr9N/ciUQN/HaISyO1T87H5XCk=; b=Q7DMDfpu6d8qVUDv609oBNgBTOrLU3eZfGrfLgtK++3tkVSZkXB82l0lB33A54jH47 AV8XPVqprXfHco0A6w0M+oRaIMcRbZt1t3I+D91XryVoVn5ASWo7p87FYzAnOQtjKTV8 jfXHdpfIGg+GqZUxlFI2H1o8LTuMmqtIe+XemvAFEOiQ9fVRLATIIUC6UfS65oYr0aT8 f2+iXBiBKW6KNGAnp5yBlTjefnBddTbzTCa1goZzDIGxBQcrJvSD/4nu9HuXFcTklpJW uEgw8qOFNjedA1cMewa1jhqQiMvr2XM76Ql4/jHP2/fjgAJ8r4xceY3DJ6+BwppKdH3+ 1QzA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=j5qbYpSZQIQB4p7/tTr9N/ciUQN/HaISyO1T87H5XCk=; b=Y1jADQot2FT17EeV1z5sMnfdLesftWeIlnK8+cyuMV1NJoguoRQ4d6DnjmLUbU8xi+ Fii5C2Bws7J0I90KbYDDwV1W5Dj4z3sqsgRWoHQp0tJcZ5YUH6VBM+Xw2ifIFOUR+Q9g QiPZAn6o0bZtOYvYtQffmNJXkZ0XFaMt/+e+wet8Cwx4CO5xlnDTxaRjnAPSsJR6/n7B 6Ow7oaO8gKLts5Mbo6kf73nrOQhVLLXylf5iO9knuMg5I3/keX7+xWDYX/LSg/PNpa1i XygBJJAVW0XZCpVi7bhaCLxjRe77w0uvyHNDrxJ3iwVcoU8lU5Hb4/k61W7/kaq28J8S c2ig== X-Gm-Message-State: AOAM531W3RwxBbsnxxi6kELyHyEdMqME9DR99b5GPy2GCkml17sBlCTB ncZ3N7IMtA31loazoXSK7tiDGQ2dR705fyIk733/lRMzmV0= X-Google-Smtp-Source: ABdhPJzoUCa86cJPquctJPHgpRNnyj+twipjKijVxsj06ugBx9haV4w+yNBMDF0U40oCmYd38FYcUxtSkkcMl5HSXY4= X-Received: by 2002:a92:89cf:: with SMTP id w76mr11926882ilk.10.1591951783299; Fri, 12 Jun 2020 01:49:43 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Xintong Song Date: Fri, 12 Jun 2020 16:49:32 +0800 Message-ID: Subject: Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard To: Vijay Balakrishnan Cc: user Content-Type: multipart/alternative; boundary="0000000000008b115505a7df2877" --0000000000008b115505a7df2877 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Flink should have calculated the heap size and set the -Xms, according to the equations I mentioned. So if you haven't set an customized -Xmx that overwrites this, it should not use the default 1.4 of physical memory. > > > - Standalone: jvmHeap =3D total * (1 - networkFraction) =3D 102 GB * (= 1 - > 0.48) =3D 53 GB > - On Yarn: jvmHeap =3D (total - Max(cutoff-min, total * cutoff-ratio))= * > (1 - networkFraction) =3D (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.= 48) =3D > 40.6GB > > Are you running Flink on Mesos? I think Flink has not automatically set -Xmx on Mesos. BTW, from your screenshot the physical memory is 123GB, so 1/4 of that is much closer to 29GB if we consider there are some rounding errors and accuracy loss. Thank you~ Xintong Song On Fri, Jun 12, 2020 at 4:33 PM Vijay Balakrishnan wrote: > Thx, Xintong for a great answer. Much appreciated. > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup= .html#jvm-heap > > > Max heap: if -Xmx is set then it is its value else =C2=BC of physical mac= hine > memory estimated by the JVM > > No -Xmx is set.So, 1/4 of 102GB =3D 25.5GB but not sure about the 29GB > figure. > > On Thu, Jun 11, 2020 at 9:14 PM Xintong Song > wrote: > >> Hi Vijay, >> >> The memory configurations in Flink 1.9 and previous versions are indeed >> complicated and confusing. That is why we made significant changes to it= in >> Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the >> upcoming Flink 1.11 which is very likely to be released in this month. >> >> Regarding your questions, >> >> - "Physical Memory" displayed on the web ui stands for the total >> memory on your machine. This information is retrieved from your OS. I= t is >> not related to the network memory calculation. It is displayed mainly= for >> historical reasons. >> - The error message means that you have about 26.8 GB network memory >> (877118 * 32768 bytes), and your job is trying to use more. >> - The "total memory" referred in network memory calculation is: >> - jvm-heap + network, if managed memory is configured on-heap >> (default) >> - According to your screenshot, the managed memory >> on-heap/off-heap configuration is not touched, so this should b= e your case. >> - jvm-heap + managed + network, if managed memory is configured >> off-heap >> - The network memory size is actually derived reversely. Flink reads >> the max heap size from JVM (and the managed memory size from configur= ation >> if it is configured off-heap), and derives the network memory size wi= th the >> following equation. >> - networkMem =3D Min(networkMax, Max(networkMin, jvmMaxHeap / >> (1-networkFraction) * networkFraction)) >> - In your case, networkMem =3D Min(50GB, Max(500MB, 29GB / (1-0.48= ) >> * 0.48)) =3D 26.8GB >> >> One thing I don't understand is, why do you only have 29GB heap size whe= n >> "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). T= he >> JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "tota= l" >> to represent "taskmanager.heap.size" for short. Also omitted the >> calculations when managed memory is configured off-heap. >> >> - Standalone: jvmHeap =3D total * (1 - networkFraction) =3D 102 GB * = (1 - >> 0.48) =3D 53 GB >> - On Yarn: jvmHeap =3D (total - Max(cutoff-min, total * cutoff-ratio)= ) >> * (1 - networkFraction) =3D (102GB - Max(600MB, 102GB * 0.25)) * (1 -= 0.48) =3D >> 40.6GB >> >> Have you specified a custom "-Xmx" parameter? >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan >> wrote: >> >>> Hi, >>> Get this error: >>> java.io.IOException: Insufficient number of network buffers: required 2= , >>> but only 0 available. The total number of network buffers is currently = set >>> to 877118 of 32768 bytes each. You can increase this number by setting = the >>> configuration keys 'taskmanager.network.memory.fraction', >>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. >>> akka.pattern.AskTimeoutException: Ask timed out on >>> [Actor[akka://flink/user/dispatcher#-1420732632]] after [10000 ms]. Mes= sage >>> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A >>> typical reason for `AskTimeoutException` is that the recipient actor di= dn't >>> send a reply. >>> >>> >>> Followed docs here: >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_set= up.html >>> >>> network =3D Min(max, Max(min, fraction x total) //what does Total mean= - >>> The max JVM heap is used to derive the total memory for the calculation= of >>> network buffers. - can I see it in the Flink Dashboard ??? 117GB here ? >>> =3D Min(50G, Max(500mb, Max(0.48 * 117G)) ) =3D MIn(50G, 56.16G)=3D 50= G >>> 877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ? >>> Used this in flink-conf.yaml: >>> taskmanager.numberOfTaskSlots: 10 >>> rest.server.max-content-length: 314572800 >>> taskmanager.network.memory.fraction: 0.45 >>> taskmanager.network.memory.max: 50gb >>> taskmanager.network.memory.min: 500mb >>> akka.ask.timeout: 240s >>> cluster.evenly-spread-out-slots: true >>> akka.tcp.timeout: 240s >>> taskmanager.network.request-backoff.initial: 5000 >>> taskmanager.network.request-backoff.max: 30000 >>> web.timeout:1000000 >>> web.refresh-interval:6000 >>> >>> Saw some old calc about buffers >>> (slots/Tm * slots/TM) * #TMs * 4 >>> =3D10 * 10 * 47 * 4 =3D 18,800 buffers. >>> >>> What am I missing in the network buffer calc ?? >>> >>> TIA, >>> >>> >>> --0000000000008b115505a7df2877 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Flink should have calculated the heap size and set the -Xm= s, according to the equations I mentioned. So if you haven't set an cus= tomized -Xmx that overwrites this, it should not use the default 1.4 of phy= sical memory.
  • Standalone: jvmHeap =3D total * (1 - networkFraction= ) =3D 102 GB * (1 - 0.48) =3D 53 GB
  • On Y= arn: jvmHeap =3D (total - Max(cutoff-min, total * cutoff-ratio)) * (1 - net= workFraction) =3D (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) =3D 40.6G= B
<= p style=3D"font-size:13px;color:rgb(0,0,0);font-family:Helvetica;margin:0px= ">

Are you running Flink on Mesos? I think Flink has not automati= cally set -Xmx on Mesos.


BTW, from your screenshot the phy= sical memory is 123GB, so 1/4 of that is much closer to 29GB if we consider= there are some rounding errors and accuracy loss.


Thank= you~

Xintong Song



On Fri, Jun 12, 2020 at 4:33 PM Vijay Balakri= shnan <bvijaykr@gmail.com> = wrote:
Thx, Xintong for a great answer. Much appreciated.

Max heap: if=C2=A0-Xmx=C2=A0is set then it is its valu= e else =C2=BC of physical machine memory estimated by the JVM
<= div class=3D"gmail_default" style=3D"font-family:verdana,sans-serif">
N= o -Xmx is set.So, 1/4 of 102GB=C2=A0 =3D 25.5GB but not sure about the 29GB= figure.

On Thu, Jun 11, 2020 at 9:14 PM Xintong Song <tonysong820@gmail.com> wrote:
Hi Vijay,

The memory configura= tions in Flink 1.9 and previous versions are indeed complicated and confusi= ng. That is why we made significant changes to it in Flink 1.10. If possibl= e, I would suggest upgrading to Flink 1.10, or the upcoming Flink 1.11 whic= h is very likely to be released in this month.

Reg= arding your questions,
  • "Physical Memory" displa= yed on the web ui stands for the total memory on your machine. This informa= tion is retrieved=C2=A0from your OS. It is not related to the network memor= y calculation. It is displayed mainly for historical reasons.
  • The e= rror message means that you have about 26.8 GB network memory (877118 * 327= 68 bytes), and your job is trying to use more.
  • The "total memo= ry" referred=C2=A0in network memory calculation is:
    • jvm-he= ap=C2=A0+ network, if managed memory is configured on-heap (default)
    • According to your screenshot, the managed memory on-heap/off-heap con= figuration is not touched, so this should be your case.
  • jvm-he= ap + managed + network, if managed memory is configured off-heap
<= li>The network memory size is actually derived reversely. Flink reads the m= ax heap size from JVM (and the managed memory size from configuration if it= is configured off-heap), and derives the network memory size with the foll= owing equation.
  • networkMem =3D Min(networkMax, Max(networkMin, = jvmMaxHeap / (1-networkFraction) * networkFraction))
  • In your case, = networkMem =3D Min(50GB, Max(500MB, 29GB / (1-0.48) * 0.48)) =3D 26.8GB
One thing I don't understand is, why do you only have 29GB h= eap size when "taskmanager.heap.size" is configured to be "1= 044221m" (about 102 GB). The JVM heap size ("-Xmx" & &qu= ot;-Xms") is calculated as follows. I'll use "total" to = represent "taskmanager.heap.size" for short. Also omitted the cal= culations when managed memory is configured off-heap.
  • Sta= ndalone: jvmHeap =3D total * (1 - networkFraction) =3D 102 GB * (1 - 0.48) = =3D 53 GB
  • On Yarn: jvmHeap =3D (total - Max(cutoff-min, total * cut= off-ratio)) * (1 - networkFraction) =3D (102GB - Max(600MB, 102GB * 0.25)) = * (1 - 0.48) =3D 40.6GB
Have you specified a custom &qu= ot;-Xmx" parameter?

Thank you~

Xintong Song

=


<= div class=3D"gmail_quote">
Hi,
Get this erro= r:
java.io.IOException: Insufficient number of network buffers: required= 2, but only 0 available. The total number of network buffers is currently = set to 877118 of 32768 bytes each. You can increase this number by setting = the configuration keys 'taskmanager.network.memory.fraction', '= taskmanager.network.memory.min', and 'taskmanager.network.memory.ma= x'.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka:/= /flink/user/dispatcher#-1420732632]] after [10000 ms]. Message of type [org= .apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason fo= r `AskTimeoutException` is that the recipient actor didn't send a reply= .


Followed docs here:
https= ://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html

network =3D Min(max, Max(min, fraction x total) =C2=A0//what does= Total mean - The max JVM heap is used to derive the total memory for the c= alculation of network buffers. - can I see it in the Flink Dashboard ??? 11= 7GB here ?
=3D Min(50G, Max(500mb, Max(0.48 * 117G)) =C2=A0) =3D MIn(50G= , 56.16G)=3D 50G
877118 of 32768 bytes each comes to 28.75GB. So, why is= it failing ?
Used this in flink-conf.yaml:
=C2=A0 =C2=A0 taskmanager= .numberOfTaskSlots: 10
=C2=A0 =C2=A0 rest.server.max-content-length: 314= 572800
=C2=A0 =C2=A0 taskmanager.network.memory.fraction: 0.45
=C2=A0= =C2=A0 taskmanager.network.memory.max: 50gb
=C2=A0 =C2=A0 taskmanager.n= etwork.memory.min: 500mb
=C2=A0 =C2=A0 akka.ask.timeout: 240s
=C2=A0 = =C2=A0 cluster.evenly-spread-out-slots: true
=C2=A0 =C2=A0 akka.tcp.time= out: 240s
taskmanager.network.request-backoff.initial: 5000
taskman= ager.network.request-backoff.max: 30000
web.timeout:1000000
web.ref= resh-interval:6000

Saw some old calc about buffers
(slots/Tm * s= lots/TM) * #TMs * 4
=3D10 * 10 * 47 * 4 =3D 18,800 buffers.

What= am I missing in the network buffer calc ?? =C2=A0

TIA,

--0000000000008b115505a7df2877--