airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Meickle (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-1157) Assigning a task to a pool that doesn't exist crashes the scheduler
Date Wed, 07 Feb 2018 19:09:01 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-1157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355904#comment-16355904
] 

James Meickle commented on AIRFLOW-1157:
----------------------------------------

Just ran into this after somehow accidentally deleting pools. From an Airflow operator perspective,
I think it would be great if this surfaced on an individual task's details, indicating "missing
pool" as a reason for non-scheduling.

> Assigning a task to a pool that doesn't exist crashes the scheduler
> -------------------------------------------------------------------
>
>                 Key: AIRFLOW-1157
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1157
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: Airflow 1.8
>            Reporter: John Culver
>            Assignee: David Klosowski
>            Priority: Critical
>
> If a dag is run that contains a task using a pool that doesn't exist, the scheduler will
crash.
> Manually triggering the run of this dag on an environment without a pool named 'a_non_existent_pool'
will crash the scheduler:
> {code}
> from datetime import datetime
> from airflow.models import DAG
> from airflow.operators.dummy_operator import DummyOperator
> dag = DAG(dag_id='crash_scheduler',
>           start_date=datetime(2017,1,1),
>           schedule_interval=None)
> t1 = DummyOperator(task_id='crash',
>                    pool='a_non_existent_pool',
>                    dag=dag)
> {code}
> Here is the relevant log output on the scheduler:
> {noformat}
> [2017-04-27 19:31:24,816] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/test-3.py
finished
> [2017-04-27 19:31:24,817] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/test_s3_file_move.py
finished
> [2017-04-27 19:31:24,819] {dag_processing.py:627} INFO - Started a process (PID: 124)
to generate tasks for /opt/airflow/dags/crash_scheduler.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/crash_scheduler.py.log
> [2017-04-27 19:31:24,822] {dag_processing.py:627} INFO - Started a process (PID: 125)
to generate tasks for /opt/airflow/dags/configuration/constants.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/configuration/constants.py.log
> [2017-04-27 19:31:24,847] {jobs.py:1007} INFO - Tasks up for execution:
>         <TaskInstance: move_s3_file_test.move_files 2017-04-27 19:31:22.298893 [scheduled]>
> [2017-04-27 19:31:24,849] {jobs.py:1030} INFO - Figuring out tasks to run in Pool(name=None)
with 128 open slots and 1 task instances in queue
> [2017-04-27 19:31:24,856] {jobs.py:1078} INFO - DAG move_s3_file_test has 0/16 running
tasks
> [2017-04-27 19:31:24,856] {jobs.py:1105} INFO - Sending to executor (u'move_s3_file_test',
u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) with priority 1 and queue
MVSANDBOX-airflow-DEV-dev
> [2017-04-27 19:31:24,859] {jobs.py:1116} INFO - Setting state of (u'move_s3_file_test',
u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) to queued
> [2017-04-27 19:31:24,867] {base_executor.py:50} INFO - Adding to queue: airflow run move_s3_file_test
move_files 2017-04-27T19:31:22.298893 --local -sd /opt/airflow/dags/test_s3_file_move.py
> [2017-04-27 19:31:24,867] {jobs.py:1440} INFO - Heartbeating the executor
> [2017-04-27 19:31:24,872] {celery_executor.py:78} INFO - [celery] queuing (u'move_s3_file_test',
u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) through celery, queue=MVSANDBOX-airflow-DEV-dev
> [2017-04-27 19:31:25,974] {jobs.py:1404} INFO - Heartbeating the process manager
> [2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/crash_scheduler.py
finished
> [2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/configuration/constants.py
finished
> [2017-04-27 19:31:25,977] {dag_processing.py:627} INFO - Started a process (PID: 128)
to generate tasks for /opt/airflow/dags/example_s3_sensor.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/example_s3_sensor.py.log
> [2017-04-27 19:31:25,980] {dag_processing.py:627} INFO - Started a process (PID: 129)
to generate tasks for /opt/airflow/dags/test-4.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/test-4.py.log
> [2017-04-27 19:31:26,004] {jobs.py:1007} INFO - Tasks up for execution:
>         <TaskInstance: crash_scheduler.crash 2017-04-27 19:30:51.948542 [scheduled]>
> [2017-04-27 19:31:26,006] {jobs.py:1311} INFO - Exited execute loop
> [2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 128
> [2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 129
> [2017-04-27 19:31:26,008] {jobs.py:1329} INFO - Waiting up to 5s for processes to exit...
> Traceback (most recent call last):
>   File "/usr/bin/airflow", line 28, in <module>
>     args.func(args)
>   File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 839, in scheduler
>     job.run()
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 200, in run
>     self._execute()
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1309, in _execute
>     self._execute_helper(processor_manager)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1437, in _execute_helper
>     (State.SCHEDULED,))
>   File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1025, in _execute_task_instances
>     open_slots = pools[pool].open_slots(session=session)
> KeyError: u'a_non_existant_pool'
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message