flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexis Gendronneau <a.gendronn...@gmail.com>
Subject Re: accessing flink HA cluster with scala shell/zeppelin notebook
Date Mon, 20 Mar 2017 10:37:37 GMT
Hello users,

As Maciek, I'm currently trying to make apache Zeppelin 0.7 working with
Flink. I have two versions of flink available (1.1.2 and 1.2.0). Each one
is running in High-availability mode.

When running jobs from Zeppelin in Flink local mode, everything works fine.
But when trying to submit job to remote host (no matter which version
involved), job is stuck in submitting phase until it reaches
akka.client.timeout.

I tried to increase timeout (like said in error raised in zeppelin), but it
only increase time before error is finally raised (tested with 600s).

On Flink side, nothing appears but :

    2017-03-20 11:19:31,675 WARN
org.apache.flink.runtime.jobmanager.JobManager - Discard message
LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
8af3a91762a171b04c4be0efe540f3d4) ,EXECUTION_RESULT_AND_STATE_CHANGES))
    because the expected leader session ID
Some(f955760c-d80d-4992-a148-5968026ca6e4) did not equal the received
leader session ID None.


On zepplin interpreter side, we get following stacktrace :

    bestCarrier: org.apache.flink.api.scala.DataSet[CarrierFlightsCount]
=     org.apache.flink.api.scala.DataSet@669fc812
    org.apache.flink.client.program.ProgramInvocationException: The
program     execution failed: Communication with JobManager failed: Job
submission to the     JobManager timed out. You may increase
'akka.client.timeout' in case the     JobManager needs more time to
configure and confirm the job submission.
      at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:409)
      at
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
      at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:382)
      at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:369)
      at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:344)
      at
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
      at
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)
      at
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
      at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
      at
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:637)
      at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
      ... 36 elided
    Caused by: org.apache.flink.runtime.client.JobExecutionException:
Communication with JobManager failed: Job submission to the JobManager
timed out. You may increase 'akka.client.timeout' in case the JobManager
needs more time to configure and confirm the job submission.
      at
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
      at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405)
      ... 46 more
    Caused by:
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
Job submission to the JobManager timed out. You may increase
'akka.client.timeout' in case the JobManager needs more time to configure
and confirm the job submission.
      at
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:264)
      at
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90)
      at
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70)
      at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
      at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
      at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
      at akka.dispatch.Mailbox.run(Mailbox.scala:221)
      at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
      at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
      at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
      at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

It looks like we have to add parameters on zepplin side, but I cant see
whats missing here. Any clue appreciated.

Regards,

2017-01-24 17:13 GMT+01:00 Aljoscha Krettek <aljoscha@apache.org>:

> +Till Rohrmann <trohrmann@apache.org>, do you know what can be used to
> access a HA cluster from that setting.
>
> Adding Till since he probably knows the HA stuff best.
>
> On Sun, 22 Jan 2017 at 15:58 Maciek Próchniak <mpr@touk.pl> wrote:
>
>> Hi,
>>
>> I have standalone Flink cluster configured with HA setting (i.e. with
>> zookeeper recovery). How should I access it remotely, e.g. with Zeppelin
>> notebook or scala shell?
>>
>> There are settings for host/port, but with HA setting they are not fixed
>> - if I check which is *current leader* host and port and set that I get
>> exception on job manager:
>>
>> 20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN
>> o.a.f.runtime.jobmanager.JobManager - Discard message
>> LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
>> 02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHANGES))
>> because the expected leader session ID
>> Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received
>> leader session ID None.
>>
>> - I guess it's reasonable behaviour, since I should use appropriate
>> LeaderRetrievalService and so on. But apparently there's no such
>> possibility in scala flink shell?
>>
>> Is it missing feature? I can prepare patch, but I'm not sure how would I
>> hook behaviour of ClusterClient into FlinkILoop?
>>
>> thanks,
>>
>> maciek
>>
>>


-- 
Alexis Gendronneau

alexis.gendronneau@corp.ovh.com
a.gendronneau@gmail.com

Mime
View raw message