flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?
Date Fri, 30 Aug 2019 07:33:12 GMT
For point 2. there exists already a JIRA issue [1] and a PR. I hope that we
can merge it during this release cycle.

[1] https://issues.apache.org/jira/browse/FLINK-13184

Cheers,
Till

On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang <shixiaogangg@gmail.com> wrote:

> Hi Datashov,
>
> We faced similar problems in our production clusters.
>
> Now both lauching and stopping of containers are performed in the main
> thread of YarnResourceManager. As containers are launched and stopped one
> after another, it usually takes long time to boostrap large jobs. Things
> get worse when some node managers get lost. Yarn will retry many times to
> communicate with them, leading to heartbeat timeout of TaskManagers.
>
> Following are some efforts we made to help Flink deal with large jobs.
>
> 1. We provision some common jars in all cluster nodes and ask our users
> not to include these jars in their uberjar. When containers bootstrap,
> these jars are added to the classpath via JVM options. That way, we can
> efficiently reduce the size of uberjars.
>
> 2. We deploys some asynchronous threads to launch and stop containers in
> YarnResourceManager. The bootstrap time can be efficiently  reduced when
> launching a large amount of containers. We'd like to contribute it to the
> community very soon.
>
> 3. We deploys a timeout timer for each launching container. If a task
> manager does not register in time after its container has been launched, a
> new container will be allocated and launched. That will lead to certain
> waste of resources, but can reduce the effects caused by slow or
> problematic nodes.
>
> Now the community is considering the refactoring of ResourceManager. I
> think it will be the time for improving its efficiency.
>
> Regards,
> Xiaogang
>
> Elkhan Dadashov <elkhan.dadashov@gmail.com> 于2019年8月30日周五 上午7:10写道:
>
>> Dear Flink developers,
>>
>> Having  difficulty of getting  a Flink job started.
>>
>> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
>> containers.
>>
>> The default HDFS replication is 3.
>>
>> *The Yarn queue is empty, and 800 containers  are allocated
>> almost immediately  by Yarn  RM.*
>>
>> It takes very long time until all 800 nodes (node managers) will download
>> Uberjar from HDFS to local machines.
>>
>> *Q1:*
>>
>> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
>> size = HDFS replication size)
>>
>> b) Or Do Flink TM's can replicate from each other  ? or  already started
>> TM's replicate  to  yet-started  nodes?
>>
>> Most probably answer is (a), but  want to confirm.
>>
>> *Q2:*
>>
>> What  is the recommended way of handling  400MB+ Uberjar with 800+
>> containers ?
>>
>> Any specific params to tune?
>>
>> Thanks.
>>
>> Because downloading the UberJar takes really   long time, after around 15
>> minutes since the job kicked, facing this exception:
>>
>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container.
>> This token is expired. current time is 1567116179193 found 1567116001610
>> Note: System times on machines may be out of sync. Check system time and time zones.
>> 	at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)
>> 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> 	at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>> 	at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>> 	at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>> 	at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>> 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>> 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> 	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> 	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> 	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> 	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>>

Mime
View raw message