From user-return-29538-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Aug 30 07:33:25 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 8B8A518065E for ; Fri, 30 Aug 2019 09:33:25 +0200 (CEST) Received: (qmail 29603 invoked by uid 500); 30 Aug 2019 07:33:24 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 29584 invoked by uid 99); 30 Aug 2019 07:33:24 -0000 Received: from Unknown (HELO mailrelay1-lw-us.apache.org) (10.10.3.159) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Aug 2019 07:33:24 +0000 Received: from mail-wm1-f45.google.com (mail-wm1-f45.google.com [209.85.128.45]) by mailrelay1-lw-us.apache.org (ASF Mail Server at mailrelay1-lw-us.apache.org) with ESMTPSA id C367D5997; Fri, 30 Aug 2019 07:33:23 +0000 (UTC) Received: by mail-wm1-f45.google.com with SMTP id d16so6309529wme.2; Fri, 30 Aug 2019 00:33:23 -0700 (PDT) X-Gm-Message-State: APjAAAXKZRSNikOAhs3A/quMkNHS9XVPFdmOCm0jBakKTPqeG6Q/u/t2 VsugRuZ4wo1h2Oxs/FXkTGhj6lhjhgQ6kmZg+/I= X-Google-Smtp-Source: APXvYqz5QeA37N+0SS7U2aW12ruNobYjSuMw5y8elPk2x5RMsHHRtXXpc6EpiPbggxN38Nj4u0dmbdsUtdLc/D0vfrg= X-Received: by 2002:a7b:ce8f:: with SMTP id q15mr453578wmj.106.1567150402797; Fri, 30 Aug 2019 00:33:22 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Till Rohrmann Date: Fri, 30 Aug 2019 09:33:12 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ? To: SHI Xiaogang Cc: Elkhan Dadashov , dev , user Content-Type: multipart/alternative; boundary="000000000000118951059150a3be" --000000000000118951059150a3be Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable For point 2. there exists already a JIRA issue [1] and a PR. I hope that we can merge it during this release cycle. [1] https://issues.apache.org/jira/browse/FLINK-13184 Cheers, Till On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang wrote= : > Hi Datashov, > > We faced similar problems in our production clusters. > > Now both lauching and stopping of containers are performed in the main > thread of YarnResourceManager. As containers are launched and stopped one > after another, it usually takes long time to boostrap large jobs. Things > get worse when some node managers get lost. Yarn will retry many times to > communicate with them, leading to heartbeat timeout of TaskManagers. > > Following are some efforts we made to help Flink deal with large jobs. > > 1. We provision some common jars in all cluster nodes and ask our users > not to include these jars in their uberjar. When containers bootstrap, > these jars are added to the classpath via JVM options. That way, we can > efficiently reduce the size of uberjars. > > 2. We deploys some asynchronous threads to launch and stop containers in > YarnResourceManager. The bootstrap time can be efficiently reduced when > launching a large amount of containers. We'd like to contribute it to the > community very soon. > > 3. We deploys a timeout timer for each launching container. If a task > manager does not register in time after its container has been launched, = a > new container will be allocated and launched. That will lead to certain > waste of resources, but can reduce the effects caused by slow or > problematic nodes. > > Now the community is considering the refactoring of ResourceManager. I > think it will be the time for improving its efficiency. > > Regards, > Xiaogang > > Elkhan Dadashov =E4=BA=8E2019=E5=B9=B48=E6=9C= =8830=E6=97=A5=E5=91=A8=E4=BA=94 =E4=B8=8A=E5=8D=887:10=E5=86=99=E9=81=93= =EF=BC=9A > >> Dear Flink developers, >> >> Having difficulty of getting a Flink job started. >> >> The job's uberjar/fat jar is around 400MB, and I need to kick 800+ >> containers. >> >> The default HDFS replication is 3. >> >> *The Yarn queue is empty, and 800 containers are allocated >> almost immediately by Yarn RM.* >> >> It takes very long time until all 800 nodes (node managers) will downloa= d >> Uberjar from HDFS to local machines. >> >> *Q1:* >> >> a) Do all those 800 nodes download of batch of 3 at a time ? ( batch >> size =3D HDFS replication size) >> >> b) Or Do Flink TM's can replicate from each other ? or already started >> TM's replicate to yet-started nodes? >> >> Most probably answer is (a), but want to confirm. >> >> *Q2:* >> >> What is the recommended way of handling 400MB+ Uberjar with 800+ >> containers ? >> >> Any specific params to tune? >> >> Thanks. >> >> Because downloading the UberJar takes really long time, after around 1= 5 >> minutes since the job kicked, facing this exception: >> >> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to= start container. >> This token is expired. current time is 1567116179193 found 1567116001610 >> Note: System times on machines may be out of sync. Check system time and= time zones. >> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Sourc= e) >> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Delegating= ConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl= .instantiateException(SerializedExceptionPBImpl.java:168) >> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl= .deSerialize(SerializedExceptionPBImpl.java:106) >> at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(N= MClientImpl.java:205) >> at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocat= ed$1(YarnResourceManager.java:400) >> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRp= cActor.java:332) >> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(Akka= RpcActor.java:158) >> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessag= e(FencedAkkaRpcActor.java:70) >> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActo= r.java:142) >> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(Fence= dAkkaRpcActor.java:40) >> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.= scala:165) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPoo= l.java:1339) >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1= 979) >> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThr= ead.java:107) >> >> >> >> --000000000000118951059150a3be Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
For point 2. there exists already a JIRA issue [1] and a P= R. I hope that we can merge it during this release cycle.

[1]=C2=A0ht= tps://issues.apache.org/jira/browse/FLINK-13184

Cheers,
Till

On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang = <shixiaogangg@gmail.com>= ; wrote:
Hi Datashov,

We faced similar problems in our = production clusters.=C2=A0

Now both lauching and s= topping of containers are performed in the main thread of YarnResourceManag= er. As containers are launched and stopped one after another, it usually ta= kes long time to boostrap large jobs. Things get worse when some node manag= ers get lost. Yarn will retry many times to communicate with them, leading = to heartbeat timeout of TaskManagers.=C2=A0

Follow= ing are some efforts we made to help Flink deal with large jobs.
=
1. We provision some common jars in all cluster nodes and as= k our users not to include these jars in their uberjar. When containers boo= tstrap, these jars are added to the classpath via JVM options. That way, we= can efficiently reduce the size of uberjars.

2. W= e deploys some asynchronous threads to launch and stop containers in YarnRe= sourceManager. The bootstrap time can be efficiently=C2=A0 reduced when lau= nching a large amount of containers. We'd like to contribute it to the = community very soon.

3. We deploys a timeout timer= for each launching container. If a task manager does not register in time = after its container has been launched, a new container will be allocated an= d launched. That will lead to certain waste of resources, but can reduce th= e effects caused by slow or problematic nodes.

Now= the community is considering the refactoring of ResourceManager. I think i= t will be the time for improving its efficiency.

R= egards,
Xiaogang

Elkhan Dadashov <elkhan.dadashov@gmail.com> = =E4=BA=8E2019=E5=B9=B48=E6=9C=8830=E6=97=A5=E5=91=A8=E4=BA=94 =E4=B8=8A=E5= =8D=887:10=E5=86=99=E9=81=93=EF=BC=9A
Dear Flink developers,

Having=C2=A0 difficulty of getting=C2=A0 a Flink job started.

The job's uberjar/fat jar is around 400MB, and=C2=A0 = I need to kick 800+ containers.=C2=A0=C2=A0

The de= fault HDFS replication=C2=A0is 3.

The Yarn queu= e is empty, and 800 containers=C2=A0 are allocated=C2=A0 almost=C2=A0immedi= ately=C2=A0 by Yarn=C2=A0 RM.

It takes very lo= ng time until all 800 nodes (node managers) will download Uberjar from HDFS= to local machines.

Q1:

a)=C2=A0 Do all those 800 nodes download of batch of=C2=A0 3=C2=A0 at= a time=C2=A0 ? ( batch size =3D HDFS replication=C2=A0size)

=
b) Or Do Flink TM's can replicate from each other=C2=A0 ? or= =C2=A0 already started=C2=A0 TM's replicate=C2=A0 to=C2=A0 yet-started= =C2=A0 nodes?

Most probably answer is (a), but=C2= =A0 want to confirm.

Q2:

What=C2=A0 is the recommended way of handling=C2=A0 400MB+ Uberjar w= ith 800+ containers ?

Any specific params to tune?=

Thanks.

Because download= ing the UberJar takes really=C2=A0 =C2=A0long time, after around 15 minutes= since the job kicked, facing this exception:

org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized reques= t to start container. This token is expired. current time is 1567116179193 found 1567116001610 Note: System times on machines may be out of sync. Check system time and ti= me zones. at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingCon= structorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.in= stantiateException(SerializedExceptionPBImpl.java:168) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.de= Serialize(SerializedExceptionPBImpl.java:106) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMCl= ientImpl.java:205) at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$= 1(YarnResourceManager.java:400) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcAc= tor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpc= Actor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(F= encedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.j= ava:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAk= kaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.sca= la:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.j= ava:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979= ) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread= .java:107)


--000000000000118951059150a3be--