airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Holden Karau's magical unicorn (JIRA)" <j...@apache.org>
Subject [jira] [Assigned] (AIRFLOW-3046) ECS Operator mistakenly reports success when task is killed due to EC2 host termination
Date Sun, 23 Sep 2018 07:06:00 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Holden Karau's magical unicorn reassigned AIRFLOW-3046:
-------------------------------------------------------

    Assignee: Holden Karau's magical unicorn

> ECS Operator mistakenly reports success when task is killed due to EC2 host termination
> ---------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3046
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3046
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: contrib, operators
>            Reporter: Dan MacTough
>            Assignee: Holden Karau's magical unicorn
>            Priority: Major
>
> We have ECS clusters made up of EC2 spot fleets. Among other things, this means hosts
can be terminated on short notice. When this happens, all tasks (and associated containers)
get terminated, as well.
> We expect that when that happens for Airflow task instances using the ECS Operator, those
instances will be marked as failures and retried.
> Instead, they are marked as successful.
> As a result, the immediate downstream task fails, causing the scheduled DAG run to fail.
> Here's an example of the Airflow log output when this happens:
> {noformat}
> [2018-09-12 01:02:02,712] {ecs_operator.py:112} INFO - ECS Task stopped, check status:
{'tasks': [{'taskArn': 'arn:aws:ecs:us-east-1:111111111111:task/32d43a1d-fbc7-4659-815d-9133bde11cdc',
'clusterArn': 'arn:aws:ecs:us-east-1:111111111111:cluster/processing', 'taskDefinitionArn':
'arn:aws:ecs:us-east-1:111111111111:task-definition/foobar-testing_dataEngineering_rd:76',
'containerInstanceArn': 'arn:aws:ecs:us-east-1:111111111111:container-instance/7431f0a6-8fc5-4eff-8196-32f77d286a61',
'overrides': {'containerOverrides': [{'name': 'foobar-testing', 'command': ['./bin/generate-features.sh',
'2018-09-11']}]}, 'lastStatus': 'STOPPED', 'desiredStatus': 'STOPPED', 'cpu': '4096', 'memory':
'60000', 'containers': [{'containerArn': 'arn:aws:ecs:us-east-1:111111111111:container/0d5cc553-f894-4f9a-b17c-9f80f7ce8d0a',
'taskArn': 'arn:aws:ecs:us-east-1:111111111111:task/32d43a1d-fbc7-4659-815d-9133bde11cdc',
'name': 'foobar-testing', 'lastStatus': 'RUNNING', 'networkBindings': [], 'networkInterfaces':
[], 'healthStatus': 'UNKNOWN'}], 'startedBy': 'Airflow', 'version': 3, 'stoppedReason': 'Host
EC2 (instance i-02cf23bbd5ae26194) terminated.', 'connectivity': 'CONNECTED', 'connectivityAt':
datetime.datetime(2018, 9, 12, 0, 6, 30, 245000, tzinfo=tzlocal()), 'pullStartedAt': datetime.datetime(2018,
9, 12, 0, 6, 32, 748000, tzinfo=tzlocal()), 'pullStoppedAt': datetime.datetime(2018, 9, 12,
0, 6, 59, 748000, tzinfo=tzlocal()), 'createdAt': datetime.datetime(2018, 9, 12, 0, 6, 30,
245000, tzinfo=tzlocal()), 'startedAt': datetime.datetime(2018, 9, 12, 0, 7, 0, 748000, tzinfo=tzlocal()),
'stoppingAt': datetime.datetime(2018, 9, 12, 1, 2, 0, 91000, tzinfo=tzlocal()), 'stoppedAt':
datetime.datetime(2018, 9, 12, 1, 2, 0, 91000, tzinfo=tzlocal()), 'group': 'family:foobar-testing_dataEngineering_rd',
'launchType': 'EC2', 'attachments': [], 'healthStatus': 'UNKNOWN'}], 'failures': [], 'ResponseMetadata':
{'RequestId': '758c791f-b627-11e8-83f7-2b76f4796ed2', 'HTTPStatusCode': 200, 'HTTPHeaders':
{'server': 'Server', 'date': 'Wed, 12 Sep 2018 01:02:02 GMT', 'content-type': 'application/x-amz-json-1.1',
'content-length': '1412', 'connection': 'keep-alive', 'x-amzn-requestid': '758c791f-b627-11e8-83f7-2b76f4796ed2'},
'RetryAttempts': 0}}{noformat}
> I believe the function that checks whether the task is successful needs at least one
more check. 
> We are currently running a modified version of the ECS Operator that contains the following
{{_check_success_task}} function to address this failure condition:
> {code}
>     def _check_success_task(self):
>         response = self.client.describe_tasks(
>             cluster=self.cluster,
>             tasks=[self.arn]
>         )
>         self.log.info('ECS Task stopped, check status: %s', response)
>         if len(response.get('failures', [])) > 0:
>             raise AirflowException(response)
>         for task in response['tasks']:
>             if 'terminated' in task.get('stoppedReason', '').lower():
>                 raise AirflowException('The task was stopped because the host instance
terminated: {}'.format(
>                     task.get('stoppedReason', '')))
>             containers = task['containers']
>             for container in containers:
>                 if container.get('lastStatus') == 'STOPPED' and \
>                         container['exitCode'] != 0:
>                     raise AirflowException(
>                         'This task is not in success state {}'.format(task))
>                 elif container.get('lastStatus') == 'PENDING':
>                     raise AirflowException(
>                         'This task is still pending {}'.format(task))
>                 elif 'error' in container.get('reason', '').lower():
>                     raise AirflowException(
>                         'This containers encounter an error during launching : {}'.
>                         format(container.get('reason', '').lower()))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message