flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8176) Dispatcher does not start SubmittedJobGraphStore
Date Wed, 06 Dec 2017 13:40:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16280185#comment-16280185
] 

ASF GitHub Bot commented on FLINK-8176:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5107#discussion_r155225574
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
---
    @@ -507,6 +514,41 @@ public void handleError(final Exception exception) {
     		onFatalError(new DispatcherException("Received an error from the LeaderElectionService.",
exception));
     	}
     
    +	//------------------------------------------------------
    +	// SubmittedJobGraphListener
    +	//------------------------------------------------------
    +
    +	@Override
    +	public void onAddedJobGraph(final JobID jobId) {
    +		getRpcService().execute(() -> {
    +			final SubmittedJobGraph submittedJobGraph;
    +			try {
    +				submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
    +			} catch (final Exception e) {
    +				log.error("Could not recover job graph for job {}.", jobId, e);
    +				return;
    +			}
    +			runAsync(() -> {
    +				if (!jobManagerRunners.containsKey(jobId)) {
    --- End diff --
    
    I think this check is not strictly needed.


> Dispatcher does not start SubmittedJobGraphStore
> ------------------------------------------------
>
>                 Key: FLINK-8176
>                 URL: https://issues.apache.org/jira/browse/FLINK-8176
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination, Job-Submission, YARN
>    Affects Versions: 1.5.0
>            Reporter: Gary Yao
>            Assignee: Gary Yao
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> {{Dispatcher}} never calls start on its {{SubmittedJobGraphStore}} instance. Hence, when
a Job is submitted (YARN session mode with HA enabled), an {{IllegalStateException}} is thrown:
> {noformat}
> java.lang.IllegalStateException: Not running. Forgot to call start()?
>         at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>         at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.verifyIsRunning(ZooKeeperSubmittedJobGraphStore.java:411)
>         at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.putJobGraph(ZooKeeperSubmittedJobGraphStore.java:222)
>         at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:202)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:207)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:151)
>         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129)
>         at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> {noformat}
> *Expected Behavior*
> In {{start()}} method, the submittedJobGraphStore should be started as so:
> {code}
> submittedJobGraphStore.start(this);
> {code}
> To enable this, the {{Dispatcher}} must implement the interface {{SubmittedJobGraphStore.SubmittedJobGraphListener}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message