flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9575) Potential race condition when removing JobGraph in HA
Date Mon, 16 Jul 2018 09:32:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544995#comment-16544995
] 

ASF GitHub Bot commented on FLINK-9575:
---------------------------------------

Github user Wosin commented on the issue:

    https://github.com/apache/flink/pull/6322
  
    Hey, 
    I think it should be okay now :) If it is i will squash it to have one commit only.


> Potential race condition when removing JobGraph in HA
> -----------------------------------------------------
>
>                 Key: FLINK-9575
>                 URL: https://issues.apache.org/jira/browse/FLINK-9575
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.5.0
>            Reporter: Dominik Wosiński
>            Assignee: Dominik Wosiński
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.2, 1.6.0
>
>
> When we are removing the _JobGraph_ from _JobManager_ for example after invoking _cancel()_,
the following code is executed : 
> {noformat}
>  
> val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val result
= if (removeJobFromStateBackend) { val futureOption = Some(future { try { // ...otherwise,
we can have lingering resources when there is a concurrent shutdown // and the ZooKeeper client
is closed. Not removing the job immediately allow the // shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => log.warn(s"Could
not remove submitted job graph $jobID.", t) } }(context.dispatcher)) try { archive ! decorateMessage(
ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable
=> log.warn(s"Could not archive the execution graph $eg.", t) } futureOption } else { None
} currentJobs.remove(jobID) result case None => None } // remove all job-related BLOBs
from local and HA store libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID,
removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) futureOption }
> val futureOption = currentJobs.get(jobID) match {
> case Some((eg, _)) =>
> val result = if (removeJobFromStateBackend) {
> val futureOption = Some(future {
> try {
> // ...otherwise, we can have lingering resources when there is a concurrent shutdown
> // and the ZooKeeper client is closed. Not removing the job immediately allow the
> // shutdown to release all resources.
> submittedJobGraphs.removeJobGraph(jobID)
> } catch {
> case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
> }
> }(context.dispatcher))
> try {
> archive ! decorateMessage(
> ArchiveExecutionGraph(
> jobID,
> ArchivedExecutionGraph.createFrom(eg)))
> } catch {
> case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
> }
> futureOption
> } else {
> None
> }
> currentJobs.remove(jobID)
> result
> case None => None
> }
> // remove all job-related BLOBs from local and HA store
> libraryCacheManager.unregisterJob(jobID)
> blobServer.cleanupJob(jobID, removeJobFromStateBackend)
> jobManagerMetricGroup.removeJob(jobID)
> futureOption
> }{noformat}
> This causes the asynchronous removal of the job and synchronous removal of blob files
connected with this jar. This means as far as I understand that there is a potential problem
that we can fail to remove job graph from _submittedJobGraphs._ If the JobManager fails and
we elect the new leader it can try to recover such job, but it will fail with an exception
since the assigned blob was already removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message