Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 95775200C5C for ; Thu, 20 Apr 2017 18:22:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9412B160B9F; Thu, 20 Apr 2017 16:22:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8E748160B91 for ; Thu, 20 Apr 2017 18:22:50 +0200 (CEST) Received: (qmail 3946 invoked by uid 500); 20 Apr 2017 16:22:44 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 3936 invoked by uid 99); 20 Apr 2017 16:22:44 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Apr 2017 16:22:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 365861A075E for ; Thu, 20 Apr 2017 16:22:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.366 X-Spam-Level: X-Spam-Status: No, score=0.366 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, KAM_LINEPADDING=1.2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.796, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id frLXbbP8NhlL for ; Thu, 20 Apr 2017 16:22:42 +0000 (UTC) Received: from mail-io0-f171.google.com (mail-io0-f171.google.com [209.85.223.171]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id AEB095FB49 for ; Thu, 20 Apr 2017 16:22:36 +0000 (UTC) Received: by mail-io0-f171.google.com with SMTP id o22so83891717iod.3 for ; Thu, 20 Apr 2017 09:22:36 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc:content-transfer-encoding; bh=ZeEVxg1oLbSts/i1xV/1m8djM9sfvt4yX+gUswcpLiQ=; b=gpTPwaTKPxW/9LBhl2H/r7V2gGZr5cOMVdl37RFnJySy0bDZwfzWDlcOYCwWpRudgf 5MPzyGv0rFUrZpxGVBxVgofnvlW9mc3Wt02JjxTImS4heRVde0Pr0bKhR7t3gh+j9CGw qSJYvNx1yWz5zF4tVmpL8bxGvxpWX0vmhHz+fTYspI+Z0tTzIfRy6EC294nsW5Gb3Z8q U0/8useU+fa2sKdr79CN909Km09DnzqXYYo3Df6qA1VxxamUG6/LcSpSrWM5dqI9SOEB sBxVSZGxu/qrHQIbWfhS5njPJNpVvsnClnUwU/jxlVE32BIyPMBQDPeNAGlUGRzDvOKk 7j1A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc:content-transfer-encoding; bh=ZeEVxg1oLbSts/i1xV/1m8djM9sfvt4yX+gUswcpLiQ=; b=k0rRZKJEEZrwHEN3qytOegSOCd38er2/9gKaENFCyAuu+0mXQ1w9f00Ggpmm8nKacj ll/oIUwJPvcZOo5J/Q9cJmZITWlCsNfzzAcsaTRR/79JgJkHPpweJthfw3oZOENoBdlM 6itCy39omz9A66AWdrDmDbMUp+Hic1AKLGb5hzCiwjdzOvJg6c0F/E3Q0tGA91nT1AVG aci81mYADjHp5B3wje6YHssnxgE3h1k+92RxXY3N/96ohmdKAg5Bzva7cLxCD8PaJQwZ 2FSzPX1/EVKmoiabVlufjbH0OmkAJ0c9j5S6XarBA675XRyWVLZNYnqwIE9Fevqf99He 53Jg== X-Gm-Message-State: AN3rC/7yYo7DU2IDlYtA2BjW9ZR7yo+ykOLVLI58QC5bC+46KW2nfDoF z1+n3mHkmphk6R1RJTKEgJc/o2AANw== X-Received: by 10.36.32.138 with SMTP id t132mr4907489itt.89.1492705323269; Thu, 20 Apr 2017 09:22:03 -0700 (PDT) MIME-Version: 1.0 Received: by 10.79.146.204 with HTTP; Thu, 20 Apr 2017 09:22:02 -0700 (PDT) In-Reply-To: <7d57ff3aa12b4d22899f65482c515128@gsdgamp19etn2.firmwide.corp.gs.com> References: <633F19C6003CFF44891E456BB8269E55014EC4FC@lhreml505-mbs.china.huawei.com> <633F19C6003CFF44891E456BB8269E55014EC565@lhreml505-mbs.china.huawei.com> <7d57ff3aa12b4d22899f65482c515128@gsdgamp19etn2.firmwide.corp.gs.com> From: =?UTF-8?B?R8OhYm9yIEfDqXZheQ==?= Date: Thu, 20 Apr 2017 18:22:02 +0200 Message-ID: Subject: Re: Flink memory usage To: "Newport, Billy" Cc: "user@flink.apache.org" Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable archived-at: Thu, 20 Apr 2017 16:22:51 -0000 Hello, You could also try using a profiler that shows what objects are using what amount of memory. E.g., JProfiler or Java Flight Recorder [1]. Best, G=C3=A1bor [1] https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/mem= leaks001.html On Thu, Apr 20, 2017 at 6:00 PM, Newport, Billy wrot= e: > Ok > > The concensus seems to be that it=E2=80=99s us not Flink J So we=E2=80=99= ll look harder at > what we=E2=80=99re doing in case there is anything silly. We are using 16= K network > buffers BTW which is around 0.5GB with the defaults. > > > > > > From: Till Rohrmann [mailto:trohrmann@apache.org] > Sent: Thursday, April 20, 2017 11:52 AM > To: Stefano Bortoli > Cc: Newport, Billy [Tech]; Fabian Hueske; user@flink.apache.org > > > Subject: Re: Flink memory usage > > > > Hi Billy, > > > > if you didn't split the different data sets up into different slot sharin= g > groups, then your maximum parallelism is 40. Thus, it should be enough to > assign 40^2 * 20 * 4 =3D 128000 network buffers. If that is not enough be= cause > you have more than 4 shuffling steps in parallel running then you have to > increase the last term. > > > > OOM exceptions should actually only occur due to user code objects. Given > that you have reserved a massive amount of memory for the network buffers > the remaining heap for the user code is probably very small. Try whether = you > can decrease the number of network buffers. Moreover, check whether your > user code keeps somewhere references to objects which could cause the OOM= . > > > > Cheers, > > Till > > > > On Thu, Apr 20, 2017 at 5:42 PM, Stefano Bortoli > wrote: > > I think that if you have a lot of memory available, the GC gets kind of > lazy. In our case, the issue was just the latency caused by the GC, cause= we > were loading more data than it could fit in memory. Hence optimizing the > code gave us a lot of improvements. FlatMaps are also dangerous as object= s > can multiply beyond expected, making co-group extremely costly. :-) A > distinct() well placed saves a lot of time and memory. > > > > My point is that having worked with scarce resources I learned that almos= t > all the time the issue was my code, not the framework. > > > > Good luck. > > > > Stefano > > > > From: Newport, Billy [mailto:Billy.Newport@gs.com] > Sent: Thursday, April 20, 2017 4:46 PM > To: Stefano Bortoli ; 'Fabian Hueske' > > > > Cc: 'user@flink.apache.org' > Subject: RE: Flink memory usage > > > > Your reuse idea kind of implies that it=E2=80=99s a GC generation rate is= sue, i.e. > it=E2=80=99s not collecting fast enough so it=E2=80=99s running out of me= mory versus heap > that=E2=80=99s actually anchored, right? > > > > > > From: Stefano Bortoli [mailto:stefano.bortoli@huawei.com] > Sent: Thursday, April 20, 2017 10:33 AM > To: Newport, Billy [Tech]; 'Fabian Hueske' > Cc: 'user@flink.apache.org' > Subject: RE: Flink memory usage > > > > Hi Billy, > > > > The only suggestion I can give is to check very well in your code for > useless variable allocations, and foster reuse as much as possible. Don= =E2=80=99t > create a new collection at any map execution, but rather clear, reuse the > collected output of the flatMap, and so on. In the past we run long proc= ess > of lot of data and small memory without problems. Many more complex > co-group, joins and so on without any issue. > > > > My2c. Hope it helps. > > > > Stefano > > > > From: Newport, Billy [mailto:Billy.Newport@gs.com] > Sent: Thursday, April 20, 2017 1:31 PM > To: 'Fabian Hueske' > Cc: 'user@flink.apache.org' > Subject: RE: Flink memory usage > > > > I don=E2=80=99t think our function are memory heavy they typically are co= groups and > merge the records on the left with the records on the right. > > > > We=E2=80=99re currently requiring 720GB of heap to do our processing whic= h frankly > appears ridiculous to us. Could too much parallelism be causing the probl= em? > Looking at: > > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Optim= al-Configuration-for-Cluster-td5024.html > > > > If we are processing 17 =E2=80=9Cdatasets=E2=80=9D in a single job and ea= ch has an > individual parallelism of 40 is that a total parallelism (potential) of > 17*40 and given your network buffers calculation of parallelism squared, > would that do it or only if we explicitly configure it that way: > > > > taskmanager.network.numberOfBuffers: p ^ 2 * t * 4 > > > where p is the maximum parallelism of the job and t is the number of task > manager. > > You can process more than one parallel task per TM if you configure more > than one processing slot per machine ( taskmanager.numberOfTaskSlots). Th= e > TM will divide its memory among all its slots. So it would be possible to > start one TM for each machine with 100GB+ memory and 48 slots each. > > > > Our pipeline for each dataset looks like this: > > > > Read avro file -> FlatMap -> Validate each record with a flatmap -> > > Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated > avro file above -> } > > Read Parquet -> FlatMap -> Filter Dead Rows > ----------------------------------=C3=A0 } Union cogroup with dead rows = and > write result to parquet file. > > > > I don=E2=80=99t understand why this logic couldn=E2=80=99t run with a sin= gle task manager > and just take longer. We=E2=80=99re having a lot of trouble trying to cha= nge the > tuning to reduce the memory burn. We run the above pipeline with parallel= ism > 40 for all 17 datasets in a single job. > > > > We=E2=80=99re running this config now which is not really justifiable for= what we=E2=80=99re > doing. > > > > 20 nodes 2 slots, 40 parallelism 36GB mem =3D 720GB of heap=E2=80=A6 > > > > Thanks > > > > From: Fabian Hueske [mailto:fhueske@gmail.com] > Sent: Wednesday, April 19, 2017 10:52 AM > To: Newport, Billy [Tech] > Cc: user@flink.apache.org > Subject: Re: Flink memory usage > > > > Hi Billy, > > Flink's internal operators are implemented to not allocate heap space > proportional to the size of the input data. > > Whenever Flink needs to hold data in memory (e.g., for sorting or buildin= g a > hash table) the data is serialized into managed memory. If all memory is = in > use, Flink starts spilling to disk. This blog post discusses how Flink us= es > its managed memory [1] (still up to date, even though it's almost 2 years > old). > > The runtime code should actually quite stable. Most of the code has been > there for several years (even before Flink was donated to the ASF) and we > haven't seen many bugs reported for the DataSet runtime. Of course this d= oes > not mean that the code doesn't contain bugs. > > > > However, Flink does not take care of the user code. For example a > GroupReduceFunction that collects a lot of data, e.g., in a List on the > heap, can still kill a program. > > I would check if you have user functions that require lots of heap memory= . > > Also reducing the size of the managed memory to have more heap space > available might help. > > If that doesn't solve the problem, it would be good if you could share so= me > details about your job (which operators, which local strategies, how many > operators) that might help to identify the misbehaving operator. > > > > Thanks, Fabian > > > [1] > https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.htm= l > > > > 2017-04-19 16:09 GMT+02:00 Newport, Billy : > > How does Flink use memory? We=E2=80=99re seeing cases when running a job = on larger > datasets where it throws OOM exceptions during the job. We=E2=80=99re usi= ng the > Dataset API. Shouldn=E2=80=99t flink be streaming from disk to disk? We w= orkaround > by using fewer slots but it seems unintuitive that I need to change these > settings given Flink !=3D Spark. Why isn=E2=80=99t Flinks memory usage co= nstant? Why > couldn=E2=80=99t I run a job with a single task and a single slot for any= size job > successfully other than it takes much longer to run. > > > > Thanks > > Billy > > > > > > > >