From user-return-29534-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Aug 29 23:10:16 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id CCE93180608 for ; Fri, 30 Aug 2019 01:10:15 +0200 (CEST) Received: (qmail 8286 invoked by uid 500); 29 Aug 2019 23:10:13 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 8267 invoked by uid 99); 29 Aug 2019 23:10:13 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Aug 2019 23:10:13 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 4DE9A1A337A; Thu, 29 Aug 2019 23:10:13 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.8 X-Spam-Level: * X-Spam-Status: No, score=1.8 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id rpGBSJMQMlM8; Thu, 29 Aug 2019 23:10:10 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2607:f8b0:4864:20::d2e; helo=mail-io1-xd2e.google.com; envelope-from=elkhan.dadashov@gmail.com; receiver= Received: from mail-io1-xd2e.google.com (mail-io1-xd2e.google.com [IPv6:2607:f8b0:4864:20::d2e]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 2BE9F7D722; Thu, 29 Aug 2019 23:10:10 +0000 (UTC) Received: by mail-io1-xd2e.google.com with SMTP id o9so10366087iom.3; Thu, 29 Aug 2019 16:10:10 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to:cc; bh=AqI6qDXhbRNO7mgfasRVMemwZARB53M53QUrhS0TGts=; b=Nybj7FV4ULUySAxTsSQ828SQC1c5NtOrZNSS/K0DXknPzcIhXeS53VcCIi4Lyi5lYc NTEjS+0JGYZEgr3Fm8RIr6BZhBz1Otzni048kY7F3aAuy7RBIQWeXi7OViGsyZ+I+y0e IBrSFpVu9qDt6o/SI2cbRISD6oHO2zyMS+ydI51gUoPke19Z4x/ZfFRo4mIsul2JD0+Y QzUk9/lB0yZH0ZJiK2c23BSVxvxcUETXWJFFdQ8vM1xalYoXBSkqXHpRB5n56uapaHu4 Aw19pmlKdEdnWDGocxuIjgdA7jAA65fsccfJg5dszQ5RE2aF5TXb6OMFv7RCHayEEhB/ ARDQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to:cc; bh=AqI6qDXhbRNO7mgfasRVMemwZARB53M53QUrhS0TGts=; b=Bwh1LckHCAj3+acax+1Veas6V6F+LlEnBspCz4BS/7C90XHPxbUQd7uEfu9Z57m75/ XZa3l2jV7eUnT5QzDk0m+u6NPk7Wm56okhH/ODW/itPxkLnPQRGjSZ6U7sko7G7qJtFL SS2bgtbCl9mx8QZHc3bfPmMVSnoBCqMGwy9HtjltcML18WZYq6gDi25loRKINkE/XHwq 1udlGQAzSITkuD9izjc6Q/HR0zgjIcBhlLPVc4ODrEpwJApSy29y+fZWnJgXGpjCq87u 6ygZgpJgYAa8e/SYvQ3w6FmRLPNewEvK4bdTO+ecSDdBbFDgkMhNOucg6mq06sthf1qY VFtg== X-Gm-Message-State: APjAAAURQ/fFckQevWz1yPKUrl/A4C1nFCOKrM8vMpHNo0Icz3Ou2mWq igKMeoI0TnEg8I59yZADOpcZMH9ig2nfsBfiXJX3PgapAss= X-Google-Smtp-Source: APXvYqzNXB4q7aXhikozAxxj70fPuAdEyhQLtnVfpxoq5kIk/aHq2EBrlmrH2K8vwsfKKJHB/N1pwRJmKJi4IKVQ4oM= X-Received: by 2002:a5d:8452:: with SMTP id w18mr14683749ior.193.1567120208649; Thu, 29 Aug 2019 16:10:08 -0700 (PDT) MIME-Version: 1.0 From: Elkhan Dadashov Date: Thu, 29 Aug 2019 16:09:32 -0700 Message-ID: Subject: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ? To: dev@flink.apache.org Cc: user@flink.apache.org Content-Type: multipart/alternative; boundary="0000000000005b642d0591499b84" --0000000000005b642d0591499b84 Content-Type: text/plain; charset="UTF-8" Dear Flink developers, Having difficulty of getting a Flink job started. The job's uberjar/fat jar is around 400MB, and I need to kick 800+ containers. The default HDFS replication is 3. *The Yarn queue is empty, and 800 containers are allocated almost immediately by Yarn RM.* It takes very long time until all 800 nodes (node managers) will download Uberjar from HDFS to local machines. *Q1:* a) Do all those 800 nodes download of batch of 3 at a time ? ( batch size = HDFS replication size) b) Or Do Flink TM's can replicate from each other ? or already started TM's replicate to yet-started nodes? Most probably answer is (a), but want to confirm. *Q2:* What is the recommended way of handling 400MB+ Uberjar with 800+ containers ? Any specific params to tune? Thanks. Because downloading the UberJar takes really long time, after around 15 minutes since the job kicked, facing this exception: org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container. This token is expired. current time is 1567116179193 found 1567116001610 Note: System times on machines may be out of sync. Check system time and time zones. at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205) at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) --0000000000005b642d0591499b84 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Dear Flink developers,

Having=C2=A0 dif= ficulty of getting=C2=A0 a Flink job started.

The = job's uberjar/fat jar is around 400MB, and=C2=A0 I need to kick 800+ co= ntainers.=C2=A0=C2=A0

The default HDFS replication= =C2=A0is 3.

The Yarn queue is empty, and 800 co= ntainers=C2=A0 are allocated=C2=A0 almost=C2=A0immediately=C2=A0 by Yarn=C2= =A0 RM.

It takes very long time until all 800 = nodes (node managers) will download Uberjar from HDFS to local machines.

Q1:

a)=C2=A0 Do all = those 800 nodes download of batch of=C2=A0 3=C2=A0 at a time=C2=A0 ? ( batc= h size =3D HDFS replication=C2=A0size)

b) Or Do Fl= ink TM's can replicate from each other=C2=A0 ? or=C2=A0 already started= =C2=A0 TM's replicate=C2=A0 to=C2=A0 yet-started=C2=A0 nodes?

Most probably answer is (a), but=C2=A0 want to confirm.

Q2:

What=C2=A0 is the= recommended way of handling=C2=A0 400MB+ Uberjar with 800+ containers ?

Any specific params to tune?

Thanks.

Because downloading the UberJar takes re= ally=C2=A0 =C2=A0long time, after around 15 minutes since the job kicked, f= acing this exception:

org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request t=
o start container.
This token is expired. current time is 1567116179193 found 1567116001610
Note: System times on machines may be out of sync. Check system time and ti=
me zones.
	at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingCon=
structorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.in=
stantiateException(SerializedExceptionPBImpl.java:168)
	at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.de=
Serialize(SerializedExceptionPBImpl.java:106)
	at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMCl=
ientImpl.java:205)
	at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$=
1(YarnResourceManager.java:400)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcAc=
tor.java:332)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpc=
Actor.java:158)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(F=
encedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.j=
ava:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAk=
kaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.sca=
la:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.j=
ava:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979=
)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread=
.java:107)


--0000000000005b642d0591499b84--