flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7216) ExecutionGraph can perform concurrent global restarts to scheduling
Date Wed, 19 Jul 2017 09:29:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16092843#comment-16092843
] 

ASF GitHub Bot commented on FLINK-7216:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128196559
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
---
    @@ -19,27 +19,33 @@
     package org.apache.flink.runtime.executiongraph.restart;
     
     import org.apache.flink.runtime.executiongraph.ExecutionGraph;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    -
    -import java.util.concurrent.Callable;
    -
    -class ExecutionGraphRestarter {
    -	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class);
    -	public static Callable<Object> restartWithDelay(final ExecutionGraph executionGraph,
final long delayBetweenRestartAttemptsInMillis) {
    -		return new Callable<Object>() {
    -			@Override
    -			public Object call() throws Exception {
    -				try {
    -					LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis);
    -					// do the delay
    -					Thread.sleep(delayBetweenRestartAttemptsInMillis);
    -				} catch(InterruptedException e) {
    -					// should only happen on shutdown
    -				}
    -				executionGraph.restart();
    -				return null;
    -			}
    -		};
    +
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}.

    + * 
    + * <p>This callback implementation is one-shot; it can only be used once.
    + */
    +public class ExecutionGraphRestartCallback implements RestartCallback {
    +
    +	/** The ExecutionGraph to restart */
    --- End diff --
    
    Please add a period here.


> ExecutionGraph can perform concurrent global restarts to scheduling
> -------------------------------------------------------------------
>
>                 Key: FLINK-7216
>                 URL: https://issues.apache.org/jira/browse/FLINK-7216
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.2.1, 1.3.1
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.2
>
>
> Because ExecutionGraph restarts happen asynchronously and possibly delayed, it can happen
in rare corner cases that two restarts are attempted concurrently, in which case some structures
on the Execution Graph undergo a concurrent access:
> Sample stack trace:
> {code}
> WARN  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Failed to restart
the job.
> java.lang.IllegalStateException: SlotSharingGroup cannot clear task assignment, group
still has allocated resources.
>     at org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup.clearTaskAssignment(SlotSharingGroup.java:78)
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.resetForNewExecution(ExecutionJobVertex.java:535)
>     at org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1151)
>     at org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestarter$1.call(ExecutionGraphRestarter.java:40)
>     at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:95)
>     at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>     at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:748)
> {code}
> The solution is to strictly guard against "subsumed" restarts via the {{globalModVersion}}
in a similar way as we fence local restarts against global restarts.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message