airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Wu Xiang (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AIRFLOW-78) airflow clear leaves dag_runs
Date Fri, 02 Sep 2016 07:15:22 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-78?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457747#comment-15457747
] 

Wu Xiang edited comment on AIRFLOW-78 at 9/2/16 7:15 AM:
---------------------------------------------------------

hi [~sanand], 

    This PR will prevent Airflow to process task instances for running DagRuns when max_active_dag_runs
reached.

    This should be a bug, when max_active_dag_runs are reached, several things should still
be processed,

    1) _process_task_instances for running dag runs
    2) manage_slas for running dag runs
    3) prevent generating new dag runs

    relative line of code:
    https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L1076

    a quick fix,
{code:python}
    def _process_dags(self, dagbag, dags, tis_out):
        """
        Iterates over the dags and processes them. Processing includes:

        1. Create appropriate DagRun(s) in the DB.
        2. Create appropriate TaskInstance(s) in the DB.
        3. Send emails for tasks that have missed SLAs.

        :param dagbag: a collection of DAGs to process
        :type dagbag: models.DagBag
        :param dags: the DAGs from the DagBag to process
        :type dags: DAG
        :param tis_out: A queue to add generated TaskInstance objects
        :type tis_out: multiprocessing.Queue[TaskInstance]
        :return: None
        """
        for dag in dags:
            dag = dagbag.get_dag(dag.dag_id)

            if dag.is_paused:
                self.logger.info("Not processing DAG {} since it's paused"
                                 .format(dag.dag_id))
                continue

            if not dag:
                self.logger.error("DAG ID {} was not found in the DagBag"
                                  .format(dag.dag_id))
                continue

            if dag.reached_max_runs:
                self.logger.info("Max runs for DAG {} has been reached".format(dag.dag_id))
            else:
                dag_run = self.create_dag_run(dag)
                if dag_run:
                    self.logger.info("Created {}".format(dag_run))

            self.logger.info("Processing {}".format(dag.dag_id))

                        self._process_task_instances(dag, tis_out)
            self.manage_slas(dag)

        models.DagStat.clean_dirty([d.dag_id for d in dags])
{code}


was (Author: wxiang7):
hi [~sanand], 

    This PR will prevent Airflow to process task instances for running DagRuns when max_active_dag_runs
reached.

    This should be a bug, when max_active_dag_runs are reached, several things should still
be processed,

    1) _process_task_instances for running dag runs
    2) manage_slas for running dag runs
    3) prevent generating new dag runs

    relative line of code:
    https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L1076

    a quick fix,
```python
    def _process_dags(self, dagbag, dags, tis_out):
        """
        Iterates over the dags and processes them. Processing includes:

        1. Create appropriate DagRun(s) in the DB.
        2. Create appropriate TaskInstance(s) in the DB.
        3. Send emails for tasks that have missed SLAs.

        :param dagbag: a collection of DAGs to process
        :type dagbag: models.DagBag
        :param dags: the DAGs from the DagBag to process
        :type dags: DAG
        :param tis_out: A queue to add generated TaskInstance objects
        :type tis_out: multiprocessing.Queue[TaskInstance]
        :return: None
        """
        for dag in dags:
            dag = dagbag.get_dag(dag.dag_id)

            if dag.is_paused:
                self.logger.info("Not processing DAG {} since it's paused"
                                 .format(dag.dag_id))
                continue

            if not dag:
                self.logger.error("DAG ID {} was not found in the DagBag"
                                  .format(dag.dag_id))
                continue

            if dag.reached_max_runs:
                self.logger.info("Max runs for DAG {} has been reached".format(dag.dag_id))
            else:
                dag_run = self.create_dag_run(dag)
                if dag_run:
                    self.logger.info("Created {}".format(dag_run))

            self.logger.info("Processing {}".format(dag.dag_id))

                        self._process_task_instances(dag, tis_out)
            self.manage_slas(dag)

        models.DagStat.clean_dirty([d.dag_id for d in dags])
```

> airflow clear leaves dag_runs
> -----------------------------
>
>                 Key: AIRFLOW-78
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-78
>             Project: Apache Airflow
>          Issue Type: Wish
>          Components: cli
>    Affects Versions: Airflow 1.6.2
>            Reporter: Adrian Bridgett
>            Assignee: Norman Mu
>            Priority: Minor
>
> (moved from https://github.com/apache/incubator-airflow/issues/829)
> "airflow clear -c -d -s 2016-01-03 dagid"  doesn't clear the dagrun, it sets it to running
instead (apparently since this is often used to re-run jobs).
> However this then breaks max_active_runs=1 (I have to stop the scheduler, then airflow
clear, psql to delete the dagrun, then start the scheduler).
> This problem was probably seen on an Airflow 1.6.x install.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message