Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 79E29200BBB for ; Thu, 10 Nov 2016 11:13:39 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 78700160B01; Thu, 10 Nov 2016 10:13:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F04EB160AF6 for ; Thu, 10 Nov 2016 11:13:37 +0100 (CET) Received: (qmail 33174 invoked by uid 500); 10 Nov 2016 10:13:37 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 33165 invoked by uid 99); 10 Nov 2016 10:13:37 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Nov 2016 10:13:36 +0000 Received: from mail-wm0-f47.google.com (mail-wm0-f47.google.com [74.125.82.47]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 69EE01A012C for ; Thu, 10 Nov 2016 10:13:36 +0000 (UTC) Received: by mail-wm0-f47.google.com with SMTP id f82so20948972wmf.1 for ; Thu, 10 Nov 2016 02:13:36 -0800 (PST) X-Gm-Message-State: ABUngvcESsFYTabE8DPS+XOGgmSHb8zc2uvv7Kr858pFyx87N98Pt2UQ0dzhNGTLaD0aaLuDnfWYP3w5jcjO3w== X-Received: by 10.28.149.79 with SMTP id x76mr5259416wmd.27.1478772814558; Thu, 10 Nov 2016 02:13:34 -0800 (PST) MIME-Version: 1.0 Received: by 10.80.168.36 with HTTP; Thu, 10 Nov 2016 02:13:34 -0800 (PST) In-Reply-To: <2055650045.647466.1478718173676@mail.yahoo.com> References: <1266388850.821413.1478637213061.ref@mail.yahoo.com> <1266388850.821413.1478637213061@mail.yahoo.com> <2111005948.11581.1478646649230@mail.yahoo.com> <2055650045.647466.1478718173676@mail.yahoo.com> From: Till Rohrmann Date: Thu, 10 Nov 2016 11:13:34 +0100 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Why did the Flink Cluster JM crash? To: amir bahmanyari Cc: "user@flink.apache.org" Content-Type: multipart/alternative; boundary=001a1148f89c50dffb0540efa036 archived-at: Thu, 10 Nov 2016 10:13:39 -0000 --001a1148f89c50dffb0540efa036 Content-Type: text/plain; charset=UTF-8 The amount of data should be fine. Try to set the number of slots to the number of cores you have available. As long as you have more Kafka topics than Flink Kafka consumers (subtasks) you should be fine. But I think you can also decrease the number of Kafka partitions a little bit. I guess that an extensive number of partitions also comes with a price. But I'm no expert there. Hope your experiments run well with these settings. Cheers, Till On Wed, Nov 9, 2016 at 8:02 PM, amir bahmanyari wrote: > Thanks Till. > I have been trying out many many configuration combinations to get to the > peak of what I can get as a reasonable performance. > And yes, when I drop the number of slots, I dont get OOM. However, I dont > get the response I want either. > The amount of data I send is kinda huge; about 105 G that's sent in an > stretch of 3.5 hours to a 4 nodes cluster running my Beam app receiving > from a 2 nodes cluster of Kafka. > From what I understand, you are suggesting that to get the best > performance, the total number of slots should be equal to the total number > of cores distributed in the cluster. > For the sake of making sure we have done that, I would go back and repeat > the testing with that in mind. > Fyi, the Kafka partitions are 4096. Roughly, 1024 per 16 cores per one > node. Is this reasonable? > Once I know the answer to this question, I will go ahead and readjust my > config and repeat the test. > I appreciate your response. > Amir- > > ------------------------------ > *From:* Till Rohrmann > *To:* amir bahmanyari > *Cc:* "user@flink.apache.org" > *Sent:* Wednesday, November 9, 2016 1:27 AM > *Subject:* Re: Why did the Flink Cluster JM crash? > > Hi Amir, > > I fear that 900 slots per task manager is a bit too many unless your > machine has 900 cores. As a rule of thumb you should allocate as many slots > as your machines have cores. Maybe you could try to decrease the number of > slots and see if you still observe an OOM error. > > Cheers, > Till > > On Wed, Nov 9, 2016 at 12:10 AM, amir bahmanyari > wrote: > > Ok. There is an OOM exception...but this used to work fine with the same > configurations. > There are four nodes: beam1 through 4. > The Kafka partitions are 4096 > 3584 deg of parallelism. > > jobmanager.rpc.address: beam1 > jobmanager.rpc.port: 6123 > jobmanager.heap.mb: 1024 > taskmanager.heap.mb: 102400 > taskmanager.numberOfTaskSlots: 896 > taskmanager.memory. preallocate: false > > parallelism.default: 3584 > > > Thanks for your valuable time Till. > > AnonymousParDo -> AnonymousParDo (3584/3584) ( > ebe8da5bda017ee31ad774c5bc5e5e 88) switched from DEPLOYING to RUNNING > 2016-11-08 22:51:44,471 INFO org.apache.flink.runtime. > executiongraph.ExecutionGraph - Source: Read(UnboundedKafkaSource) > -> AnonymousParDo -> AnonymousParDo (3573/3584) ( > ddf5a8939c1fc4ad1e6d71f17fe5ab 0b) switched from RUNNING to FAILED > 2016-11-08 22:51:44,474 INFO org.apache.flink.runtime. > executiongraph.ExecutionGraph - Source: Read(UnboundedKafkaSource) > -> AnonymousParDo -> AnonymousParDo (1/3584) ( > 865c54432153a0230e62bf7610118f f8) switched from RUNNING to CANCELING > 2016-11-08 22:51:44,474 INFO org.apache.flink.runtime. > jobmanager.JobManager - Status of job > e61cada683c0f7a709101c26c2c9a1 7c (benchbeamrunners-abahman- 1108225128) > changed to FAILING. > j*ava.lang.OutOfMemoryError: unable to create new native thread* > at java.lang.Thread.start0(Native Method) > at java.lang.Thread.start(Thread. java:714) > at java.util.concurrent. ThreadPoolExecutor.addWorker( > ThreadPoolExecutor.java:950) > at java.util.concurrent. ThreadPoolExecutor. ensurePrestart( > ThreadPoolExecutor.java:1587) > at java.util.concurrent. ScheduledThreadPoolExecutor. delayedExecute( > ScheduledThreadPoolExecutor. java:334) > at java.util.concurrent. ScheduledThreadPoolExecutor. schedule( > ScheduledThreadPoolExecutor. java:533) > at java.util.concurrent. Executors$ DelegatedScheduledExecutorServ > ice.schedule(Executors.java: 729) > at org.apache.flink.streaming. runtime.tasks.StreamTask. > registerTimer(StreamTask.java: 652) > at org.apache.flink.streaming. api.operators. AbstractStreamOperator. > registerTimer( AbstractStreamOperator.java: 250) > at org.apache.flink.streaming. api.operators. StreamingRuntimeContext. > registerTimer( StreamingRuntimeContext.java: 92) > at org.apache.beam.runners.flink. translation.wrappers.streaming.io. > UnboundedSourceWrapper. setNextWatermarkTimer( UnboundedSourceWrapper.java: > 381) > at org.apache.beam.runners.flink. translation.wrappers.streaming.io. > UnboundedSourceWrapper.run( UnboundedSourceWrapper.java: 233) > at org.apache.flink.streaming. api.operators.StreamSource. > run(StreamSource.java:78) > at org.apache.flink.streaming. runtime.tasks. SourceStreamTask.run( > SourceStreamTask.java:56) > at org.apache.flink.streaming. runtime.tasks.StreamTask. > invoke(StreamTask.java:224) > at org.apache.flink.runtime. taskmanager.Task.run(Task. java:559) > at java.lang.Thread.run(Thread. java:745) > > > > ------------------------------ > *From:* Till Rohrmann > *To:* user@flink.apache.org; amir bahmanyari > *Sent:* Tuesday, November 8, 2016 2:11 PM > *Subject:* Re: Why did the Flink Cluster JM crash? > > Hi Amir, > > what does the JM logs say? > > Cheers, > Till > > On Tue, Nov 8, 2016 at 9:33 PM, amir bahmanyari > wrote: > > Hi colleagues, > I started the cluster all fine. Started the Beam app running in the Flink > Cluster fine. > Dashboard showed all tasks being consumed and open for business. > I started sending data to the Beam app, and all of the sudden the Flink JM > crashed. > Exceptions below. > Thanks+regards > Amir > > java.lang.RuntimeException: Pipeline execution failed > at org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. > java:113) > at org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. > java:48) > at org.apache.beam.sdk.Pipeline. run(Pipeline.java:183) > at benchmark.flinkspark.flink. BenchBeamRunners.main( > BenchBeamRunners.java:622) //p.run(); > at sun.reflect. NativeMethodAccessorImpl. invoke0(Native Method) > at sun.reflect. NativeMethodAccessorImpl. invoke( > NativeMethodAccessorImpl.java: 62) > at sun.reflect. DelegatingMethodAccessorImpl. invoke( > DelegatingMethodAccessorImpl. java:43) > at java.lang.reflect.Method. invoke(Method.java:498) > at org.apache.flink.client. program.PackagedProgram. > callMainMethod( PackagedProgram.java:505) > at org.apache.flink.client. program.PackagedProgram. > invokeInteractiveModeForExecut ion(PackagedProgram.java:403) > at org.apache.flink.client. program.Client.runBlocking( > Client.java:248) > at org.apache.flink.client. CliFrontend. executeProgramBlocking( > CliFrontend.java:866) > at org.apache.flink.client. CliFrontend.run(CliFrontend. java:333) > at org.apache.flink.client. CliFrontend.parseParameters( > CliFrontend.java:1189) > at org.apache.flink.client. CliFrontend.main(CliFrontend. > java:1239) > Caused by: org.apache.flink.client. program. ProgramInvocationException: > The program execution failed: Communication with JobManager failed: Lost > connection to the JobManager. > at org.apache.flink.client. program.Client.runBlocking( > Client.java:381) > at org.apache.flink.client. program.Client.runBlocking( > Client.java:355) > at org.apache.flink.streaming. api.environment. > StreamContextEnvironment. execute( StreamContextEnvironment.java: 65) > at org.apache.beam.runners.flink. FlinkPipelineExecutionEnvironm > ent.executePipeline( FlinkPipelineExecutionEnvironm ent.java:118) > at org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. > java:110) > ... 14 more > Caused by: org.apache.flink.runtime. client.JobExecutionException: > Communication with JobManager failed: Lost connection to the JobManager. > at org.apache.flink.runtime. client.JobClient. > submitJobAndWait(JobClient. java:140) > at org.apache.flink.client. program.Client.runBlocking( > Client.java:379) > ... 18 more > Caused by: org.apache.flink.runtime. client. > JobClientActorConnectionTimeou tException: Lost connection to the > JobManager. > at org.apache.flink.runtime. client.JobClientActor. > handleMessage(JobClientActor. java:244) > at org.apache.flink.runtime.akka. FlinkUntypedActor. > handleLeaderSessionID( FlinkUntypedActor.java:88) > at org.apache.flink.runtime.akka. FlinkUntypedActor.onReceive( > FlinkUntypedActor.java:68) > at akka.actor.UntypedActor$$ anonfun$receive$1.applyOrElse( > UntypedActor.scala:167) > at akka.actor.Actor$class. aroundReceive(Actor.scala:465) > at akka.actor.UntypedActor. aroundReceive(UntypedActor. scala:97) > at akka.actor.ActorCell. receiveMessage(ActorCell. scala:516) > at akka.actor.ActorCell.invoke( ActorCell.scala:487) > at akka.dispatch.Mailbox. processMailbox(Mailbox.scala: 254) > at akka.dispatch.Mailbox.run( Mailbox.scala:221) > at akka.dispatch.Mailbox.exec( Mailbox.scala:231) > at scala.concurrent.forkjoin. ForkJoinTask.doExec( > ForkJoinTask.java:260) > at scala.concurrent.forkjoin. ForkJoinPool$WorkQueue. > pollAndExecAll(ForkJoinPool. java:1253) > at scala.concurrent.forkjoin. ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java: 1346) > at scala.concurrent.forkjoin. ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > at scala.concurrent.forkjoin. ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > > > > > > > > --001a1148f89c50dffb0540efa036 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
The amount of data should be fine. Try to set the number o= f slots to the number of cores you have available.

As lo= ng as you have more Kafka topics than Flink Kafka consumers (subtasks) you = should be fine. But I think you can also decrease the number of Kafka parti= tions a little bit. I guess that an extensive number of partitions also com= es with a price. But I'm no expert there.

Hope= your experiments run well with these settings.

Ch= eers,
Till

On Wed, Nov 9, 2016 at 8:02 PM, amir bahmanyari <amirt= ousa@yahoo.com> wrote:
T= hanks Till.
I have been trying out many many configuration c= ombinations to get to the peak of what I can get as a reasonable performanc= e.
And yes, when I drop the number of slots, I dont get OOM.= However, I dont get the response I want either.
The amount = of data I send is kinda huge; about 105 G that's sent in an stretch of = 3.5 hours to a 4 nodes cluster running my Beam app receiving from a 2 nodes= cluster of Kafka.
From what I understand, you are suggestin= g that to get the best performance, the total number of slots should be equ= al to the total number of cores distributed in the cluster.
= For the sake of making sure we have done that, I would go back and repeat t= he testing with that in mind.
Fyi, the Kafka partitions are = 4096. Roughly, 1024 per 16 cores per one node. Is this reasonable?
Once I know the answer to this q= uestion, I will go ahead and readjust my config and repeat the test.
<= div class=3D"m_947160225833289481qtdSeparateBR" id=3D"m_947160225833289481y= ui_3_16_0_ym19_1_1478660034859_47571">I appreciate your response.
Amir-

From: Till Rohrmann <<= a href=3D"mailto:till.rohrmann@gmail.com" target=3D"_blank">till.rohrmann@g= mail.com>
To:<= /b> amir bahmanyari <amirtousa@yahoo.com>
C= c: "user@flink.apache.org" <user@flink.apache.org>
Sent: Wednesday, November 9, 2016 1:27 AM
Subject: R= e: Why did the Flink Cluster JM crash?

Hi Amir,

I fear that 900 slots per t= ask manager is a bit too many unless your machine has 900 cores. As a rule = of thumb you should allocate as many slots as your machines have cores. May= be you could try to decrease the number of slots and see if you still obser= ve an OOM error.

Cheers,
= Till

On Wed, = Nov 9, 2016 at 12:10 AM, amir bahmanyari <amirtousa@yahoo.com> wrote:
= Ok. There is an OOM exception...but this used to work fine with the same co= nfigurations.
T= here are four nodes:=C2=A0beam1 through 4.
The Kafka partitions are 4096 > 3584 deg of paral= lelism.

jobmanager.rpc.= address: beam1
jobmanager.rp= c.port: 6123
jobmanager.heap= .mb: 1024
taskmanager.heap.m= b: 102400
taskmanager.number= OfTaskSlots: =C2=A0896=C2=A0
taskmanager.memory. preallocate: false

parallelism.default:=C2=A03584


Thanks for your valuable time Till.

AnonymousParDo -> AnonymousParDo (358= 4/3584) ( ebe8da5bda017ee31ad774c5bc5e5e 88) switched from DEPLOYING to RUN= NING
2016-11-08 22:51:44,471= INFO =C2=A0org.apache.flink.runtime. executiongraph.ExecutionGraph =C2=A0 = =C2=A0 =C2=A0 =C2=A0- Source: Read(UnboundedKafkaSource) -> AnonymousPar= Do -> AnonymousParDo (3573/3584) ( ddf5a8939c1fc4ad1e6d71f17fe5ab 0b) sw= itched from RUNNING to FAILED
2016-11-08 22:51:44,474 INFO =C2=A0org.apache.fl= ink.runtime. jobmanager.JobManager =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0- Status of job e61cada683c0f7a709101c26c2c9a1 7c (benchbe= amrunners-abahman- 1108225128) changed to FAILING.
java.lang.OutOfMemoryError: u= nable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread. java:714)
at java.util.concurrent. ThreadPoolExecutor.ad= dWorker( ThreadPoolExecutor.java:950)
at java.util.concurrent. ThreadPoolExecutor. ensurePrestart( ThreadPoo= lExecutor.java:1587)
at java.util= .concurrent. ScheduledThreadPoolExecutor. delayedExecute( ScheduledThreadPo= olExecutor. java:334)
at java.uti= l.concurrent. ScheduledThreadPoolExecutor. schedule( ScheduledThreadPoolExe= cutor. java:533)
at java.util.con= current. Executors$ DelegatedScheduledExecutorServ ice.schedule(Executors.j= ava: 729)
at org.apache.flink.str= eaming. runtime.tasks.StreamTask. registerTimer(StreamTask.java: 652)
=
at org.apache.flink.streaming. api.ope= rators. AbstractStreamOperator. registerTimer( AbstractStreamOperator.java:= 250)
at org.apache.flink.streami= ng. api.operators. StreamingRuntimeContext. registerTimer( StreamingRuntime= Context.java: 92)
at org.apache= .beam.runners.flink. translation.wrappers.streaming.io. Un= boundedSourceWrapper. setNextWatermarkTimer( UnboundedSourceWrapper.java: 3= 81)
at org.apache.beam.runners.fl= ink. translation.wrappers.streaming.io. UnboundedSourceWra= pper.run( UnboundedSourceWrapper.java: 233)
at org.apache.flink.streaming. api.operators.StreamSource. run(S= treamSource.java:78)
at org.apach= e.flink.streaming. runtime.tasks. SourceStreamTask.run( SourceStreamTask.ja= va:56)
at org.apache.flink.stream= ing. runtime.tasks.StreamTask. invoke(StreamTask.java:224)
at org.apache.flink.runtime. taskmanager.Task.run= (Task. java:559)
at java.lang.Thread.run(Thread. java:745)



<= div id=3D"m_947160225833289481yiv7518230257m_2988525553129045968yui_3_16_0_= ym19_1_1477610403404_853118" style=3D"font-family:HelveticaNeue,Helvetica N= eue,Helvetica,Arial,Lucida Grande,sans-serif;font-size:12px">
=
From: Till Rohrmann = <till.rohrmann@gmail.com>
To: user@flin= k.apache.org; amir bahmanyari <amirtousa@yahoo.com= >
Sent: Tuesday, November 8, 2016 2:11 PM
Subject: Re: Why did the Flink Cluster JM crash= ?

Hi Ami= r,

what does the JM logs say?

<= div>Cheers,
Till

On Tue, Nov 8, 2016 at 9:33 PM, amir bahmanyari <amirtousa@yahoo.com> wrote:<= br clear=3D"none">
Hi colleagues,
I started the cluster all fine. Started the Beam = app running in the Flink Cluster fine.
Dashboard showed all tasks being consumed a= nd open for business.
I started sending data to the Beam app, and all of the sudde= n the Flink JM crashed.
Exceptions below.
Thanks+regards
Amir

java.lang.RuntimeException: = Pipeline execution failed
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.beam.runners.flink. FlinkRunner.= run(FlinkRunner. java:113)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.beam.runners.= flink. FlinkRunner.run(FlinkRunner. java:48)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.ap= ache.beam.sdk.Pipeline. run(Pipeline.java:183)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at benc= hmark.flinkspark.flink. BenchBeamRunners.main( BenchBeamRunners.java:622) = =C2=A0//p.run();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at sun.reflect. NativeMethodAccessorI= mpl. invoke0(Native Method)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at sun.reflect. NativeMeth= odAccessorImpl. invoke( NativeMethodAccessorImpl.java: 62)
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 at sun.reflect. DelegatingMethodAccessorImpl. invoke( DelegatingMeth= odAccessorImpl. java:43)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.reflect.Method. = invoke(Method.java:498)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.client. pr= ogram.PackagedProgram. callMainMethod( PackagedProgram.java:505)
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 at org.apache.flink.client. program.PackagedProgram. invokeInter= activeModeForExecut ion(PackagedProgram.java:403)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at o= rg.apache.flink.client. program.Client.runBlocking( Client.java:248)
<= div id=3D"m_947160225833289481yiv7518230257m_2988525553129045968yiv59849563= 85m_817411614676118975yui_3_16_0_ym19_1_1477610403404_831474">=C2=A0 =C2=A0= =C2=A0 =C2=A0 at org.apache.flink.client. CliFrontend. executeProgramBlock= ing( CliFrontend.java:866)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.client.= CliFrontend.run(CliFrontend. java:333)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.= flink.client. CliFrontend.parseParameters( CliFrontend.java:1189)
=C2=A0 =C2=A0 = =C2=A0 =C2=A0 at org.apache.flink.client. CliFrontend.main(CliFrontend. jav= a:1239)
Caused by: org.apache.flink.client. program. ProgramInvocationException: T= he program execution failed: Communication with JobManager failed: Lost con= nection to the JobManager.
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.client.= program.Client.runBlocking( Client.java:381)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.a= pache.flink.client. program.Client.runBlocking( Client.java:355)
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 at org.apache.flink.streaming. api.environment. StreamContextEnv= ironment. execute( StreamContextEnvironment.java: 65)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = at org.apache.beam.runners.flink. FlinkPipelineExecutionEnvironm ent.execut= ePipeline( FlinkPipelineExecutionEnvironm ent.java:118)
=C2=A0 =C2=A0 =C2=A0 =C2= =A0 at org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:110= )
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 ... 14 more
Caused by: org.apache.flink.runtime. client.JobExecutionExcepti= on: Communication with JobManager failed: Lost connection to the JobManager= .
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime. client.JobClient. sub= mitJobAndWait(JobClient. java:140)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink= .client. program.Client.runBlocking( Client.java:379)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = ... 18 more
<= div id=3D"m_947160225833289481yiv7518230257m_2988525553129045968yiv59849563= 85m_817411614676118975yui_3_16_0_ym19_1_1477610403404_831489">Caused by: or= g.apache.flink.runtime. client. JobClientActorConnectionTimeou tException: = Lost connection to the JobManager.
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink= .runtime. client.JobClientActor. handleMessage(JobClientActor. java:244)
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.akka. FlinkUntypedActor. h= andleLeaderSessionID( FlinkUntypedActor.java:88)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at or= g.apache.flink.runtime.akka. FlinkUntypedActor.onReceive( FlinkUntypedActor= .java:68)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at akka.actor.UntypedActor$$ anonfun$receive= $1.applyOrElse( UntypedActor.scala:167)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at akka.actor.= Actor$class. aroundReceive(Actor.scala:465)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at akka.ac= tor.UntypedActor. aroundReceive(UntypedActor. scala:97)
=C2=A0 =C2=A0 =C2=A0 =C2= =A0 at akka.actor.ActorCell. receiveMessage(ActorCell. scala:516)
=C2=A0 =C2=A0 = =C2=A0 =C2=A0 at akka.actor.ActorCell.invoke( ActorCell.scala:487)
=C2=A0 =C2=A0 = =C2=A0 =C2=A0 at akka.dispatch.Mailbox. processMailbox(Mailbox.scala: 254)<= /div>
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 at akka.dispatch.Mailbox.run( Mailbox.scala:221)
=
=C2=A0 =C2= =A0 =C2=A0 =C2=A0 at akka.dispatch.Mailbox.exec( Mailbox.scala:231)
=C2=A0 =C2=A0 = =C2=A0 =C2=A0 at scala.concurrent.forkjoin. ForkJoinTask.doExec( ForkJoinTa= sk.java:260)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at scala.concurrent.forkjoin. ForkJoinPoo= l$WorkQueue. pollAndExecAll(ForkJoinPool. java:1253)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 a= t scala.concurrent.forkjoin. ForkJoinPool$WorkQueue. runTask(ForkJoinPool.j= ava: 1346)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at scala.concurrent.forkjoin. ForkJoinPool.= runWorker( ForkJoinPool.java:1979)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at scal= a.concurrent.forkjoin. ForkJoinWorkerThread.run( ForkJoinWorkerThread.java:= 107)



=

<= /div>


--001a1148f89c50dffb0540efa036--