Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 76BC4200C33 for ; Fri, 24 Feb 2017 19:53:50 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 758C6160B69; Fri, 24 Feb 2017 18:53:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9A273160B62 for ; Fri, 24 Feb 2017 19:53:49 +0100 (CET) Received: (qmail 98523 invoked by uid 500); 24 Feb 2017 18:53:48 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 98512 invoked by uid 99); 24 Feb 2017 18:53:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Feb 2017 18:53:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 67F5E18E83B for ; Fri, 24 Feb 2017 18:53:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -1.546 X-Spam-Level: X-Spam-Status: No, score=-1.546 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-2.999, SPF_NEUTRAL=0.652, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id DbjyG7b8jg86 for ; Fri, 24 Feb 2017 18:53:47 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 999FB5F3F5 for ; Fri, 24 Feb 2017 18:53:46 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id D8221E088A for ; Fri, 24 Feb 2017 18:53:45 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id AB71624136 for ; Fri, 24 Feb 2017 18:53:44 +0000 (UTC) Date: Fri, 24 Feb 2017 18:53:44 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 24 Feb 2017 18:53:50 -0000 [ https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883315#comment-15883315 ] ASF GitHub Bot commented on FLINK-5703: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3340#discussion_r102995512 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java --- @@ -440,6 +440,72 @@ public void testSendCancelAndReceiveFail() { } } + /** + * For job manager failure recovery case, the execution may still in reconciling state but already recovered + * basic information including slot, when process the failed execution, it will trigger to cancel all the current + * executions. It is necessary to send cancel rpc to reconciling state execution with slot because the task manger + * already reports its status for recovery. + */ + @Test + public void testCancelFromReconcilingWithSlot() { + try { + final JobVertexID jid = new JobVertexID(); + final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService()); + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); + final ActorGateway actorGateway = new CancelSequenceActorGateway(TestingUtils.directExecutionContext(), 1); + + final Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway)); + final SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + + setVertexState(vertex, ExecutionState.RECONCILING); + setVertexResource(vertex, slot); + + assertEquals(ExecutionState.RECONCILING, vertex.getExecutionState()); + + vertex.cancel(); + vertex.getCurrentExecutionAttempt().cancelingComplete(); // response by task manager once actually canceled + + assertEquals(ExecutionState.CANCELED, vertex.getExecutionState()); + assertTrue(slot.isReleased()); + assertNull(vertex.getFailureCause()); + assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); + assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0); + assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /** + * For job manager failure recovery case, the execution may still in reconciling state because the task manager + * does not report its status within duration time. It is no need to send cancel rpc for such execution with no real + * attempt id and slot. And it can be transition to canceled state directly, the same with the cases of scheduled or created. + */ + @Test + public void testCancelFromReconcilingNoSlot() { + try { --- End diff -- We are trying to avoid this pattern now (we used it in the earlier days). It is better for logging and debugging to simply declare `throws Exception` on the test method. > ExecutionGraph recovery based on reconciliation with TaskManager reports > ------------------------------------------------------------------------ > > Key: FLINK-5703 > URL: https://issues.apache.org/jira/browse/FLINK-5703 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, JobManager > Reporter: zhijiang > Assignee: zhijiang > > The ExecutionGraph structure would be recovered from TaskManager reports during reconciling period, and the necessary information includes: > - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer Execution) > - ExecutionVertex: Map > - ExecutionGraph: ConcurrentHashMap > For {{RECONCILING}} ExecutionState, it should be transition into any existing task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the TaskManger should maintain the terminal task state ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this mechanism in another jira. In addition, the state transition would trigger different actions, and some actions rely on above necessary information. Considering this limit, the recovery process will be divided into two steps: > - First, recovery all other necessary information except ExecutionState. > - Second, transition ExecutionState into real task state and trigger actions. The behavior is the same with current {{UpdateTaskExecutorState}}. > To make logic easy and consistency, during recovery period, all the other RPC messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from TaskManager should be refused temporarily and responded with a special message by JobMaster. Then the TaskManager should retry to send these messages later until JobManager ends recovery and acknowledgement. > For {{RECONCILING}} JobStatus, it would be transition into one of the states ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery. > - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within duration time and all the tasks are in {{RUNNING}} states. > - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}} > - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within duration time and all the tasks are in {{FINISHED}} states. -- This message was sent by Atlassian JIRA (v6.3.15#6346)