flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports
Date Fri, 24 Feb 2017 18:53:44 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883315#comment-15883315

ASF GitHub Bot commented on FLINK-5703:

Github user StephanEwen commented on a diff in the pull request:

    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
    @@ -440,6 +440,72 @@ public void testSendCancelAndReceiveFail() {
    +	/**
    +	 * For job manager failure recovery case, the execution may still in reconciling state
but already recovered
    +	 * basic information including slot, when process the failed execution, it will trigger
to cancel all the current
    +	 * executions. It is necessary to send cancel rpc to reconciling state execution with
slot because the task manger
    +	 * already reports its status for recovery.
    +	 */
    +	@Test
    +	public void testCancelFromReconcilingWithSlot() {
    +		try {
    +			final JobVertexID jid = new JobVertexID();
    +			final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService());
    +			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
    +			final ActorGateway actorGateway = new CancelSequenceActorGateway(TestingUtils.directExecutionContext(),
    +			final Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
    +			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
    +			setVertexState(vertex, ExecutionState.RECONCILING);
    +			setVertexResource(vertex, slot);
    +			assertEquals(ExecutionState.RECONCILING, vertex.getExecutionState());
    +			vertex.cancel();
    +			vertex.getCurrentExecutionAttempt().cancelingComplete(); // response by task manager
once actually canceled
    +			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
    +			assertTrue(slot.isReleased());
    +			assertNull(vertex.getFailureCause());
    +			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
    +			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
    +			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +			fail(e.getMessage());
    +		}
    +	}
    +	/**
    +	 * For job manager failure recovery case, the execution may still in reconciling state
because the task manager
    +	 *  does not report its status within duration time. It is no need to send cancel rpc
for such execution with no real
    +	 *  attempt id and slot. And it can be transition to canceled state directly, the same
with the cases of scheduled or created.
    +	 */
    +	@Test
    +	public void testCancelFromReconcilingNoSlot() {
    +		try {
    --- End diff --
    We are trying to avoid this pattern now (we used it in the earlier days).
    It is better for logging and debugging to simply declare `throws Exception` on the test

> ExecutionGraph recovery based on reconciliation with TaskManager reports
> ------------------------------------------------------------------------
>                 Key: FLINK-5703
>                 URL: https://issues.apache.org/jira/browse/FLINK-5703
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination, JobManager
>            Reporter: zhijiang
>            Assignee: zhijiang
> The ExecutionGraph structure would be recovered from TaskManager reports during reconciling
period, and the necessary information includes:
>     - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, ExecutionState, SimpleSlot,
PartialInputChannelDeploymentDescriptor(Consumer Execution)
>     - ExecutionVertex: Map<IntermediateResultPartitionID, IntermediateResultPartition>
>     - ExecutionGraph: ConcurrentHashMap<ExecutionAttemptID, Execution>
> For {{RECONCILING}} ExecutionState, it should be transition into any existing task states
({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the TaskManger should maintain
the terminal task state ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize
this mechanism in another jira. In addition, the state transition would trigger different
actions, and some actions rely on above necessary information. Considering this limit, the
recovery process will be divided into two steps:
>     - First, recovery all other necessary information except ExecutionState.
>     - Second, transition ExecutionState into real task state and trigger actions. The
behavior is the same with current {{UpdateTaskExecutorState}}.
> To make logic easy and consistency, during recovery period, all the other RPC messages
({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from TaskManager should
be refused temporarily and responded with a special message by JobMaster. Then the TaskManager
should retry to send these messages later until JobManager ends recovery and acknowledgement.
> For {{RECONCILING}} JobStatus, it would be transition into one of the states ({{RUNNING}},{{FAILING}},{{FINISHED}})
after recovery.
>     - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within duration time
and all the tasks are in {{RUNNING}} states.
>     - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in time,
or one of the tasks state is in {{FAILED}} or {{CANCELED}}
>     - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within duration time
and all the tasks are in {{FINISHED}} states.

This message was sent by Atlassian JIRA

View raw message