Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 77DCE200C44 for ; Mon, 27 Mar 2017 15:10:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 76609160B85; Mon, 27 Mar 2017 13:10:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4739E160B5D for ; Mon, 27 Mar 2017 15:10:25 +0200 (CEST) Received: (qmail 35605 invoked by uid 500); 27 Mar 2017 13:10:24 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 35594 invoked by uid 99); 27 Mar 2017 13:10:24 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Mar 2017 13:10:24 +0000 Received: from mail-yw0-f175.google.com (mail-yw0-f175.google.com [209.85.161.175]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id E78931A00A8 for ; Mon, 27 Mar 2017 13:10:23 +0000 (UTC) Received: by mail-yw0-f175.google.com with SMTP id v76so30543419ywg.0 for ; Mon, 27 Mar 2017 06:10:23 -0700 (PDT) X-Gm-Message-State: AFeK/H3CnnB63owcDRPJ7xfRQ1GQVMBqHuVlwxW4WvLY2C6jPvWEvG32vysA8taJDZedi7suo00bQSRg1+CIrQ== X-Received: by 10.13.215.199 with SMTP id z190mr17703182ywd.325.1490620222996; Mon, 27 Mar 2017 06:10:22 -0700 (PDT) MIME-Version: 1.0 Received: by 10.129.120.216 with HTTP; Mon, 27 Mar 2017 06:09:42 -0700 (PDT) In-Reply-To: References: <90ec126e-1d76-0255-6a12-e2d0915ddfe9@touk.pl> From: Till Rohrmann Date: Mon, 27 Mar 2017 15:09:42 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: accessing flink HA cluster with scala shell/zeppelin notebook To: user@flink.apache.org Content-Type: multipart/alternative; boundary=94eb2c05d498e31a46054bb61055 archived-at: Mon, 27 Mar 2017 13:10:26 -0000 --94eb2c05d498e31a46054bb61055 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Maciek and Alexis, as far as I can tell, I think it is currently not possible to use Zeppelin with a Flink cluster running in HA mode. In order to make it work, it would be necessary to specify either a Flink configuration for the Flink interpreter (this is probably the most general solution) or to enable the HA mode in Zeppelin. Enabling the HA mode would mean that we set high-availability: zookeeper in the configuration and then set all the remaining high-availability configuration options [1] to the same values with which the Flink cluster was started. This would have to be contributed to the Zeppelin project. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.ht= ml#high-availability-ha Cheers, Till =E2=80=8B On Thu, Mar 23, 2017 at 11:41 AM, Robert Metzger wrote: > Hi Alexis, > > did you set the Zookeeper configuration for Flink in Zeppelin? > > On Mon, Mar 20, 2017 at 11:37 AM, Alexis Gendronneau < > a.gendronneau@gmail.com> wrote: > >> Hello users, >> >> As Maciek, I'm currently trying to make apache Zeppelin 0.7 working with >> Flink. I have two versions of flink available (1.1.2 and 1.2.0). Each on= e >> is running in High-availability mode. >> >> When running jobs from Zeppelin in Flink local mode, everything works >> fine. But when trying to submit job to remote host (no matter which vers= ion >> involved), job is stuck in submitting phase until it reaches >> akka.client.timeout. >> >> I tried to increase timeout (like said in error raised in zeppelin), but >> it only increase time before error is finally raised (tested with 600s). >> >> On Flink side, nothing appears but : >> >> 2017-03-20 11:19:31,675 WARN org.apache.flink.runtime.jobmanager.Jo= bManager >> - Discard message LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: >> 8af3a91762a171b04c4be0efe540f3d4) ,EXECUTION_RESULT_AND_STATE_CHANGES)) >> because the expected leader session ID Some(f955760c-d80d-4992-a148-= 5968026ca6e4) >> did not equal the received leader session ID None. >> >> >> On zepplin interpreter side, we get following stacktrace : >> >> bestCarrier: org.apache.flink.api.scala.DataSet[CarrierFlightsCount] >> =3D org.apache.flink.api.scala.DataSet@669fc812 >> org.apache.flink.client.program.ProgramInvocationException: The >> program execution failed: Communication with JobManager failed: Job >> submission to the JobManager timed out. You may increase >> 'akka.client.timeout' in case the JobManager needs more time to >> configure and confirm the job submission. >> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >> ent.java:409) >> at org.apache.flink.client.program.StandaloneClusterClient.subm >> itJob(StandaloneClusterClient.java:95) >> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >> ent.java:382) >> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >> ent.java:369) >> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >> ent.java:344) >> at org.apache.flink.client.RemoteExecutor.executePlanWithJars(R >> emoteExecutor.java:211) >> at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExe >> cutor.java:188) >> at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEn >> vironment.java:172) >> at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu >> tionEnvironment.java:896) >> at org.apache.flink.api.scala.ExecutionEnvironment.execute(Exec >> utionEnvironment.scala:637) >> at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) >> ... 36 elided >> Caused by: org.apache.flink.runtime.client.JobExecutionException: >> Communication with JobManager failed: Job submission to the JobManager >> timed out. You may increase 'akka.client.timeout' in case the JobManager >> needs more time to configure and confirm the job submission. >> at org.apache.flink.runtime.client.JobClient.submitJobAndWait( >> JobClient.java:137) >> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >> ent.java:405) >> ... 46 more >> Caused by: org.apache.flink.runtime.clien >> t.JobClientActorSubmissionTimeoutException: Job submission to the >> JobManager timed out. You may increase 'akka.client.timeout' in case the >> JobManager needs more time to configure and confirm the job submission. >> at org.apache.flink.runtime.client.JobClientActor.handleMessage >> (JobClientActor.java:264) >> at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeader >> SessionID(FlinkUntypedActor.java:90) >> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(Fl >> inkUntypedActor.java:70) >> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(Untyp >> edActor.scala:167) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask. >> java:260) >> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExec >> All(ForkJoinPool.java:1253) >> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask( >> ForkJoinPool.java:1346) >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo >> l.java:1979) >> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW >> orkerThread.java:107) >> >> It looks like we have to add parameters on zepplin side, but I cant see >> whats missing here. Any clue appreciated. >> >> Regards, >> >> 2017-01-24 17:13 GMT+01:00 Aljoscha Krettek : >> >>> +Till Rohrmann , do you know what can be used to >>> access a HA cluster from that setting. >>> >>> Adding Till since he probably knows the HA stuff best. >>> >>> On Sun, 22 Jan 2017 at 15:58 Maciek Pr=C3=B3chniak wrote: >>> >>>> Hi, >>>> >>>> I have standalone Flink cluster configured with HA setting (i.e. with >>>> zookeeper recovery). How should I access it remotely, e.g. with Zeppel= in >>>> notebook or scala shell? >>>> >>>> There are settings for host/port, but with HA setting they are not fix= ed >>>> - if I check which is *current leader* host and port and set that I ge= t >>>> exception on job manager: >>>> >>>> 20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN >>>> o.a.f.runtime.jobmanager.JobManager - Discard message >>>> LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: >>>> 02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHANGES)) >>>> because the expected leader session ID >>>> Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received >>>> leader session ID None. >>>> >>>> - I guess it's reasonable behaviour, since I should use appropriate >>>> LeaderRetrievalService and so on. But apparently there's no such >>>> possibility in scala flink shell? >>>> >>>> Is it missing feature? I can prepare patch, but I'm not sure how would= I >>>> hook behaviour of ClusterClient into FlinkILoop? >>>> >>>> thanks, >>>> >>>> maciek >>>> >>>> >> >> >> -- >> Alexis Gendronneau >> >> alexis.gendronneau@corp.ovh.com >> a.gendronneau@gmail.com >> > > --94eb2c05d498e31a46054bb61055 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

Hi Maciek and Alexis,

as far as I can tell, I think i= t is currently not possible to use Zeppelin with a Flink cluster running in= HA mode. In order to make it work, it would be necessary to specify either= a Flink configuration for the Flink interpreter (this is probably the most= general solution) or to enable the HA mode in Zeppelin. Enabling the HA mo= de would mean that we set high-availability: zookeeper in the configuration and then set all the remaining hi= gh-availability configuration options [1] to the same values with wh= ich the Flink cluster was started. This would have to be contributed to the= Zeppelin project.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/= config.html#high-availability-ha

Cheers,
Till

=E2=80=8B

On Thu, Mar 23, 2017 at 11:41 AM, Robert Metzger <rmetzger@= apache.org> wrote:
Hi Alexis,

did you set the Zookeeper configurat= ion for Flink in Zeppelin?=C2=A0

On Mon, = Mar 20, 2017 at 11:37 AM, Alexis Gendronneau <a.gendronneau@gmail.c= om> wrote:
Hello users,

As Maciek, I'm currently trying to make = apache Zeppelin 0.7 working with Flink. I have two versions of flink availa= ble (1.1.2 and 1.2.0). Each one is running in High-availability mode.
<= br>When running jobs from Zeppelin in Flink local mode, everything works fi= ne. But when trying to submit job to remote host (no matter which version i= nvolved), job is stuck in submitting phase until it reaches akka.client.tim= eout.

I tried to increase timeout (like said in error raised in zep= pelin), but it only increase time before error is finally raised (tested wi= th 600s).

On Flink side, nothing appears but :

=C2=A0=C2=A0= =C2=A0 2017-03-20 11:19:31,675 WARN=C2=A0 org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(null,SubmitJ= ob(JobGraph(jobId: 8af3a91762a171b04c4be0efe540f3d4) ,EXECUTION_RESULT= _AND_STATE_CHANGES))
=C2=A0 =C2=A0 because the expected leader sess= ion ID Some(f955760c-d80d-4992-a148-5968026ca6e4) did not equal the re= ceived leader session ID None.


On zepplin interpreter side, we g= et following stacktrace :

=C2=A0=C2=A0=C2=A0 bestCarrier: org.apache= .flink.api.scala.DataSet[CarrierFlightsCount] =3D=C2=A0=C2=A0=C2=A0=C2= =A0 org.apache.flink.api.scala.DataSet@669fc812
=C2=A0=C2=A0=C2=A0 = org.apache.flink.client.program.ProgramInvocationException: The progra= m=C2=A0=C2=A0=C2=A0=C2=A0 execution failed: Communication with JobManager f= ailed: Job submission to the=C2=A0=C2=A0=C2=A0=C2=A0 JobManager timed out. = You may increase 'akka.client.timeout' in case the=C2=A0=C2=A0=C2= =A0=C2=A0 JobManager needs more time to configure and confirm the job submi= ssion.
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.progra<= wbr>m.ClusterClient.run(ClusterClient.java:409)
=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0 at org.apache.flink.client.program.StandaloneClusterClien= t.submitJob(StandaloneClusterClient.java:95)
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0 at org.apache.flink.client.program.ClusterClient.run(C= lusterClient.java:382)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache= .flink.client.program.ClusterClient.run(ClusterClient.java:369)=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.program.Cl= usterClient.run(ClusterClient.java:344)
=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0 at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.f= link.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.api.java.RemoteEn= vironment.execute(RemoteEnvironment.java:172)
=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0 at org.apache.flink.api.java.ExecutionEnvironment.execute(Ex= ecutionEnvironment.java:896)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.= apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvi= ronment.scala:637)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.ap= i.scala.DataSet.collect(DataSet.scala:547)
=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0 ... 36 elided
=C2=A0=C2=A0=C2=A0 Caused by: org.apache.flin= k.runtime.client.JobExecutionException: Communication with JobManager = failed: Job submission to the JobManager timed out. You may increase 'a= kka.client.timeout' in case the JobManager needs more time to configure= and confirm the job submission.
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.a= pache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.j= ava:137)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.progr= am.ClusterClient.run(ClusterClient.java:405)
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0 ... 46 more
=C2=A0=C2=A0=C2=A0 Caused by: org.apache.fli= nk.runtime.client.JobClientActorSubmissionTimeoutException: Job s= ubmission to the JobManager timed out. You may increase 'akka.client.ti= meout' in case the JobManager needs more time to configure and confirm = the job submission.
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.r= untime.client.JobClientActor.handleMessage(JobClientActor.java:26= 4)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.akka.= FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90= )
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.akka.F= linkUntypedActor.onReceive(FlinkUntypedActor.java:70)
=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 at akka.actor.UntypedActor$$anonfun$receive$1.apply= OrElse(UntypedActor.scala:167)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at ak= ka.actor.Actor$class.aroundReceive(Actor.scala:465)
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0 at akka.actor.UntypedActor.aroundReceive(UntypedActor.= scala:97)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at akka.actor.ActorCell.receive= Message(ActorCell.scala:516)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at akka= .actor.ActorCell.invoke(ActorCell.scala:487)
=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at akka.dispatch.Mailbox.run(Mailbox.= scala:221)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at akka.dispatch.Mailbox.exec(= Mailbox.scala:231)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at scala.concurre= nt.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJoinPool$Wor= kQueue.pollAndExecAll(ForkJoinPool.java:1253)
=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(= ForkJoinPool.java:1346)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at scala.con= current.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJoin= WorkerThread.run(ForkJoinWorkerThread.java:107)

It looks = like we have to add parameters on zepplin side, but I cant see whats missin= g here. Any clue appreciated.

Regards,

2017-01-24 17:13 GMT+01:00 Aljoscha Krettek <aljoscha@apache.org>:
+Till Rohrmann, do you know= what can be used to access a HA cluster from that setting.

<= div>Adding Till since he probably knows the HA stuff best.

On Sun, 22 Jan 2017 at 15:58 Maciek Pr=C3=B3chniak <mpr@touk.pl> wrote:
Hi,

I have standalone Flink cluster configured with HA setting (i.e. with
zookeeper recovery). How should I access it remotely, e.g. with Zeppelin notebook or scala shell?

There are settings for host/port, but with HA setting they are not fixed - if I check which is *current leader* host and port and set that I get
exception on job manager:

20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN
o.a.f.runtime.jobmanager.JobManager - Discard message
LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHAN= GES))
because the expected leader session ID
Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received<= br class=3D"m_98391158504390117m_4355597380035103795m_8111038259618566422gm= ail_msg"> leader session ID None.

- I guess it's reasonable behaviour, since I should use appropriate
LeaderRetrievalService and so on. But apparently there's no such
possibility in scala flink shell?

Is it missing feature? I can prepare patch, but I'm not sure how would = I
hook behaviour of ClusterClient into FlinkILoop?

thanks,

maciek




--


--94eb2c05d498e31a46054bb61055--