airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Uri Shamay (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AIRFLOW-928) Same {task,execution_date} run multiple times in worker when using CeleryExecutor
Date Thu, 02 Mar 2017 10:24:45 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Uri Shamay updated AIRFLOW-928:
-------------------------------
    Description: 
Hi,

When using with Airflow with CeleryExecutor, both RabbitMQ && Redis I tested, I see
that when workers are down, the scheduler run each period of time **append** to the same key
of {task,execution_date} in the broker, the same {task,execution_date}, what means is that
if workers are down/can't connect to broker for few hours, I got in the broker thousands of
same executions.
In my scenario I have just one dummy dag to run with dag_concurrency of 4,
I expected in that scenario that broker will hold just 4 messages, and the scheduler shouldn't
queuing another and another and another for same {task, execution_date}

What happened is that when workers start to consume messages, they got thousands of tasks
for just 4 tasks, and when they trying to write to database for task_instances - there are
errors of integrity while such {task,execution_date} already exist.

Attached files:

1. airflow.log - this is the task log, you can see that few instances processes of same {task,execution_date}
write to the same log file.

2. worker.log - this is the worker log, you can see that worker trying to run same {task,execution_date}
multiple times + the errors from the database integrity that said that those tasks on those
dates already exists.

3. scheduler.log to show that scheduler decided to send again and again and again infinitely
the same {job,execution_date}

4. the dummy_dag.py of the test

5. rabbitmq.queue - show that after 5 minutes the broker queue contains 40 messages of same
4 {job,execution_date}

6. dag_runs.png - show that there are only 4 jobs that need to be run, while there are much
more messages in the queue

Thanks.

  was:
Hi,

When using with Airflow with CeleryExecutor, both RabbitMQ && Redis I tested, I see
that when workers are down, the scheduler run each period of time **append** to the same key
of {task,execution_date} in the broker, the same {task,execution_date}, what means is that
if workers are down/can't connect to broker for few hours, I got in the broker thousands of
same executions.
In my scenario I have just one dummy dag to run with dag_concurrency of 4,
I expected in that scenario that broker will hold just 4 messages, and the scheduler shouldn't
queuing another and another and another for same {task, execution_date}

What happened is that when workers start to consume messages, they got thousands of tasks
for just 4 tasks, and when they trying to write to database for task_instances - there are
errors of integrity while such {task,execution_date} already exist.

Attached 2 files:

1. airflow.log - this is the task log, you can see that few instances processes of same {task,execution_date}
write to the same log file.

2. worker.log - this is the worker log, you can see that worker trying to run same {task,execution_date}
multiple times + the errors from the database integrity that said that those tasks on those
dates already exists.

Thanks.


> Same {task,execution_date} run multiple times in worker when using CeleryExecutor
> ---------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-928
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-928
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery
>    Affects Versions: Airflow 1.7.1.3
>         Environment: Docker
>            Reporter: Uri Shamay
>         Attachments: airflow.log, dag_runs.png, dummy_dag.py, rabbitmq.queue, scheduler.log,
worker.log
>
>
> Hi,
> When using with Airflow with CeleryExecutor, both RabbitMQ && Redis I tested,
I see that when workers are down, the scheduler run each period of time **append** to the
same key of {task,execution_date} in the broker, the same {task,execution_date}, what means
is that if workers are down/can't connect to broker for few hours, I got in the broker thousands
of same executions.
> In my scenario I have just one dummy dag to run with dag_concurrency of 4,
> I expected in that scenario that broker will hold just 4 messages, and the scheduler
shouldn't queuing another and another and another for same {task, execution_date}
> What happened is that when workers start to consume messages, they got thousands of tasks
for just 4 tasks, and when they trying to write to database for task_instances - there are
errors of integrity while such {task,execution_date} already exist.
> Attached files:
> 1. airflow.log - this is the task log, you can see that few instances processes of same
{task,execution_date} write to the same log file.
> 2. worker.log - this is the worker log, you can see that worker trying to run same {task,execution_date}
multiple times + the errors from the database integrity that said that those tasks on those
dates already exists.
> 3. scheduler.log to show that scheduler decided to send again and again and again infinitely
the same {job,execution_date}
> 4. the dummy_dag.py of the test
> 5. rabbitmq.queue - show that after 5 minutes the broker queue contains 40 messages of
same 4 {job,execution_date}
> 6. dag_runs.png - show that there are only 4 jobs that need to be run, while there are
much more messages in the queue
> Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message