flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Arnaud Linz (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-8580) No easy way (or issues when trying?) to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job
Date Wed, 07 Feb 2018 10:56:00 GMT
Arnaud Linz created FLINK-8580:
----------------------------------

             Summary: No easy way (or issues when trying?) to handle multiple yarn sessions
and choose at runtime the one to submit a ha streaming job
                 Key: FLINK-8580
                 URL: https://issues.apache.org/jira/browse/FLINK-8580
             Project: Flink
          Issue Type: Improvement
          Components: Job-Submission
    Affects Versions: 1.3.2
            Reporter: Arnaud Linz


Hello,

I am using Flink 1.3.2 and I’m struggling to achieve something that should be simple.

For isolation reasons, I want to start multiple long living yarn session containers (with
the same user) and choose at run-time, when I start a HA streaming app, which container will
hold it.

I start my yarn session with the command line option : -Dyarn.properties-file.location=mydir

The session is created and a .yarn-properties-$USER file is generated.

And I’ve tried the following to submit my job:

 

*CASE 1* 

*flink-conf.yaml* : yarn.properties-file.location: mydir
*flink run options* : none
 * Uses zookeeper and works  – but I cannot choose the container as the property file is
global.

 ** 

*CASE 2*

*flink-conf.yaml* : nothing
*flink run options* : -yid applicationId
 * Do not use zookeeper, tries to connect to yarn job manager but fails in “Job submission
to the JobManager timed out” error

 ** 

*CASE 3*

*flink-conf.yaml* : nothing
*flink run options* : -yid applicationId and -yD with all dynamic properties found in the
“dynamicPropertiesString” of .yarn-properties-$USER file
 * Same as case 2

 ** 

*CASE 4*

*flink-conf.yaml* : nothing
*flink run options* : -yD yarn.properties-file.location=mydir
 * Tries to connect to local (non yarn) job manager (and fails)

 ** 

*CASE 5*

Even weirder:

*flink-conf.yaml* : yarn.properties-file.location: mydir
*flink run options* : -yD yarn.properties-file.location=mydir


 * Still tries to connect to local (non yarn) job manager!

 

Without any other solution, I've made a shell script that copies the original content of FLINK_CONF_DIR
in a temporary dir, modify flink-conf.yaml to set yarn.properties-file.location, and change
FLINK_CONF_DIR to that temp dir before executing flink to submit the job.

I am now able to select the container I want, but I think it should be made simpler…

 

Logs extracts:

*CASE 1:*

{{2018:02:01 15:43:20 - Waiting until all TaskManagers have connected}}{{2018:02:01 15:43:20
- Starting client actor system.}}{{2018:02:01 15:43:20 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01
15:43:20 - Trying to select the network interface and address to use by connecting to the
leading JobManager.}}{{2018:02:01 15:43:20 - TaskManager will try to connect for 10000 milliseconds
before falling back to heuristics}}{{2018:02:01 15:43:21 - Retrieved new target address elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.}}{{2018:02:01
15:43:21 - Stopping ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - Slf4jLogger
started}}{{2018:02:01 15:43:21 - Starting remoting}}{{2018:02:01 15:43:21 - Remoting started;
listening on addresses :[akka.tcp://flink@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:36340]}}{{2018:02:01
15:43:21 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.}}{{2018:02:01
15:43:21 - TaskManager status (2/1)}}{{2018:02:01 15:43:21 - All TaskManagers are connected}}{{2018:02:01
15:43:21 - Submitting job with JobID: f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.}}{{2018:02:01
15:43:21 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - Received SubmitJobAndWait(JobGraph(jobId:
f69197b0b80a76319a87bde10c1e3f77)) but there is no connection to a JobManager yet.}}{{2018:02:01
15:43:21 - Received job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77).}}{{2018:02:01
15:43:21 - Disconnect from JobManager null.}}{{2018:02:01 15:43:21 - Connect to JobManager
Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].}}{{2018:02:01
15:43:21 - Connected to JobManager at Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245]
with leader session id 388af5b8-5555-4923-8ee4-8a4b9bfbb0b9.}}{{2018:02:01 15:43:21 - Sending
message to JobManager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager
to submit job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77) and wait for progress}}{{2018:02:01
15:43:21 - Upload jar files to job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.}}{{2018:02:01
15:43:21 - Blob client connecting to akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager}}{{2018:02:01
15:43:22 - Submit job to the job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.}}{{2018:02:01
15:43:22 - Job f69197b0b80a76319a87bde10c1e3f77 was successfully submitted to the JobManager
akka://flink/deadLetters.}}{{2018:02:01 15:43:22 - 02/01/2018 15:43:22   Job execution switched
to status RUNNING.}}

 

*CASE 2:*

{{2018:02:01 15:48:43 - Waiting until all TaskManagers have connected}}{{2018:02:01 15:48:43
- Starting client actor system.}}{{2018:02:01 15:48:43 - Trying to select the network interface
and address to use by connecting to the leading JobManager.}}{{2018:02:01 15:48:43 - TaskManager
will try to connect for 10000 milliseconds before falling back to heuristics}}{{2018:02:01
15:48:43 - Retrieved new target address elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.}}{{2018:02:01
15:48:43 - Slf4jLogger started}}{{2018:02:01 15:48:43 - Starting remoting}}{{2018:02:01 15:48:43
- Remoting started; listening on addresses :[akka.tcp://flink@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:34140]}}{{2018:02:01
15:48:43 - TaskManager status (2/1)}}{{2018:02:01 15:48:43 - All TaskManagers are connected}}{{2018:02:01
15:48:43 - Submitting job with JobID: cd3e0e223c57d01d415fe7a6a308576c. Waiting for job completion.}}{{2018:02:01
15:48:43 - Received SubmitJobAndWait(JobGraph(jobId: cd3e0e223c57d01d415fe7a6a308576c)) but
there is no connection to a JobManager yet.}}{{2018:02:01 15:48:43 - Received job SND-IMP-SIGNAST
(cd3e0e223c57d01d415fe7a6a308576c).}}{{2018:02:01 15:48:43 - Disconnect from JobManager null.}}{{2018:02:01
15:48:43 - Connect to JobManager Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].}}{{2018:02:01
15:48:43 - Connected to JobManager at Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245]
with leader session id 00000000-0000-0000-0000-000000000000.}}{{2018:02:01 15:48:43 - Sending
message to JobManager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager
to submit job SND-IMP-SIGNAST (cd3e0e223c57d01d415fe7a6a308576c) and wait for progress}}{{2018:02:01
15:48:43 - Upload jar files to job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.}}{{2018:02:01
15:48:43 - Blob client connecting to akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager}}{{2018:02:01
15:48:45 - Submit job to the job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.}}{{2018:02:01
15:49:45 - Terminate JobClientActor.}}{{2018:02:01 15:49:45 - Disconnect from JobManager Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].}}

 

Then

{{Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job
submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the
JobManager needs more time to configure and confirm the job submission.}}{{        at
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)}}{{       
at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)}}{{       
at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)}}{{       
at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)}}{{       
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)}}{{        at
akka.actor.Actor$class.aroundReceive(Actor.scala:467)}}{{        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)}}{{       
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)}}{{        at akka.actor.ActorCell.invoke(ActorCell.scala:487)}}{{       
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)}}{{        at akka.dispatch.Mailbox.run(Mailbox.scala:220)}}{{       
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)}}{{       
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}{{       
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}{{       
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}{{       
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}

 

*CASE 3,4* 

{{ ** }}{{2018:02:01 15:35:14 - Starting client actor system.}}{{2018:02:01 15:35:14 - Trying
to select the network interface and address to use by connecting to the leading JobManager.}}{{2018:02:01
15:35:14 - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics}}{{2018:02:01
15:35:14 - Retrieved new target address localhost/127.0.0.1:6123.}}{{2018:02:01 15:35:15 -
Trying to connect to address localhost/127.0.0.1:6123}}{{2018:02:01 15:35:15 - Failed to connect
from address 'elara-edge-u2-n01/10.136.170.196': Connexion refusée (Connection refused)}}{{2018:02:01
15:35:15 - Failed to connect from address '/127.0.0.1': Connexion refusée (Connection refused)}}{{2018:02:01
15:35:15 - Failed to connect from address '/192.168.117.1': Connexion refusée (Connection
refused)}}{{2018:02:01 15:35:15 - Failed to connect from address '/10.136.170.225': Connexion
refusée (Connection refused)}}{{2018:02:01 15:35:15 - Failed to connect from address '/fe80:0:0:0:20c:29ff:fe8f:3fdd%ens192':
Le réseau n'est pas accessible (connect failed)}}{{2018:02:01 15:35:15 - Failed to connect
from address '/10.136.170.196': Connexion refusée (Connection refused)}}{{2018:02:01 15:35:15
- Failed to connect from address '/127.0.0.1': Connexion refusée (Connection refused)}}{{2018:02:01
15:35:15 - Failed to connect from address '/192.168.117.1': Connexion refusée (Connection
refused)}}{{2018:02:01 15:35:15 - Failed to connect from address '/10.136.170.225': Connexion
refusée (Connection refused)}}{{2018:02:01 15:35:15 - Failed to connect from address '/fe80:0:0:0:20c:29ff:fe8f:3fdd%ens192':
Le réseau n'est pas accessible (connect failed)}}{{2018:02:01 15:35:15 - Failed to connect
from address '/10.136.170.196': Connexion refusée (Connection refused)}}{{2018:02:01 15:35:15
- Failed to connect from address '/127.0.0.1': Connexion refusée (Connection refused)}}

 ** 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message