airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1474] Add dag_id regex feature for `airflow clear` command
Date Wed, 02 Aug 2017 19:53:46 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 836f2899c -> 18f8498a7


[AIRFLOW-1474] Add dag_id regex feature for `airflow clear` command

Closes #2486 from jgao54/airflow-clear


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/18f8498a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/18f8498a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/18f8498a

Branch: refs/heads/master
Commit: 18f8498a74cfd7c65d7b4b1c7e868e738ecfa240
Parents: 836f289
Author: Joy Gao <joyg@wepay.com>
Authored: Wed Aug 2 12:53:35 2017 -0700
Committer: Chris Riccomini <criccomini@apache.org>
Committed: Wed Aug 2 12:53:35 2017 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py | 38 ++++++++++++++++++++++-------
 airflow/models.py  | 65 ++++++++++++++++++++++++++++++++++++++++++++-----
 tests/core.py      | 10 ++++++++
 tests/models.py    | 63 +++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 161 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/18f8498a/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index a8543d3..dc49bb7 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -40,6 +40,7 @@ import threading
 import traceback
 import time
 import psutil
+import re
 
 import airflow
 from airflow import api
@@ -49,7 +50,8 @@ from airflow.exceptions import AirflowException
 from airflow.executors import GetDefaultExecutor
 from airflow.models import (DagModel, DagBag, TaskInstance,
                             DagPickle, DagRun, Variable, DagStat,
-                            Connection)
+                            Connection, DAG)
+
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
 from airflow.utils import db as db_utils
 from airflow.utils import logging as logging_utils
@@ -127,6 +129,19 @@ def get_dag(args):
     return dagbag.dags[args.dag_id]
 
 
+def get_dags(args):
+    if not args.dag_regex:
+        return [get_dag(args)]
+    dagbag = DagBag(process_subdir(args.subdir))
+    matched_dags = [dag for dag in dagbag.dags.values() if re.search(
+        args.dag_id, dag.dag_id)]
+    if not matched_dags:
+        raise AirflowException(
+            'dag_id could not be found with regex: {}. Either the dag did not exist '
+            'or it failed to parse.'.format(args.dag_id))
+    return matched_dags
+
+
 def backfill(args, dag=None):
     logging.basicConfig(
         level=settings.LOGGING_LEVEL,
@@ -599,15 +614,17 @@ def clear(args):
     logging.basicConfig(
         level=settings.LOGGING_LEVEL,
         format=settings.SIMPLE_LOG_FORMAT)
-    dag = get_dag(args)
+    dags = get_dags(args)
 
     if args.task_regex:
-        dag = dag.sub_dag(
-            task_regex=args.task_regex,
-            include_downstream=args.downstream,
-            include_upstream=args.upstream,
-        )
-    dag.clear(
+        for idx, dag in enumerate(dags):
+            dags[idx] = dag.sub_dag(
+                task_regex=args.task_regex,
+                include_downstream=args.downstream,
+                include_upstream=args.upstream)
+
+    DAG.clear_dags(
+        dags,
         start_date=args.start_date,
         end_date=args.end_date,
         only_failed=args.only_failed,
@@ -1237,6 +1254,9 @@ class CLIFactory(object):
         'exclude_subdags': Arg(
             ("-x", "--exclude_subdags"),
             "Exclude subdags", "store_true"),
+        'dag_regex': Arg(
+            ("-dx", "--dag_regex"),
+            "Search dag_id as regex instead of exact string", "store_true"),
         # trigger_dag
         'run_id': Arg(("-r", "--run_id"), "Helps to identify this run"),
         'conf': Arg(
@@ -1482,7 +1502,7 @@ class CLIFactory(object):
             'args': (
                 'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir',
                 'upstream', 'downstream', 'no_confirm', 'only_failed',
-                'only_running', 'exclude_subdags'),
+                'only_running', 'exclude_subdags', 'dag_regex'),
         }, {
             'func': pause,
             'help': "Pause a DAG",

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/18f8498a/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index cc54f36..6e423dc 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3249,11 +3249,10 @@ class DAG(BaseDag, LoggingMixin):
             # Crafting the right filter for dag_id and task_ids combo
             conditions = []
             for dag in self.subdags + [self]:
-                if dag.task_ids:
-                    conditions.append(
-                        TI.dag_id.like(dag.dag_id) &
-                        TI.task_id.in_(dag.task_ids)
-                    )
+                conditions.append(
+                    TI.dag_id.like(dag.dag_id) &
+                    TI.task_id.in_(dag.task_ids)
+                )
             tis = tis.filter(or_(*conditions))
         else:
             tis = session.query(TI).filter(TI.dag_id == self.dag_id)
@@ -3276,7 +3275,6 @@ class DAG(BaseDag, LoggingMixin):
         count = tis.count()
         do_it = True
         if count == 0:
-            print("Nothing to clear.")
             return 0
         if confirm_prompt:
             ti_list = "\n".join([str(t) for t in tis])
@@ -3298,6 +3296,61 @@ class DAG(BaseDag, LoggingMixin):
         session.close()
         return count
 
+    @classmethod
+    def clear_dags(
+            cls, dags,
+            start_date=None,
+            end_date=None,
+            only_failed=False,
+            only_running=False,
+            confirm_prompt=False,
+            include_subdags=True,
+            reset_dag_runs=True,
+            dry_run=False):
+        all_tis = []
+        for dag in dags:
+            tis = dag.clear(
+                start_date=start_date,
+                end_date=end_date,
+                only_failed=only_failed,
+                only_running=only_running,
+                confirm_prompt=False,
+                include_subdags=include_subdags,
+                reset_dag_runs=reset_dag_runs,
+                dry_run=True)
+            all_tis.extend(tis)
+
+        if dry_run:
+            return all_tis
+
+        count = len(all_tis)
+        do_it = True
+        if count == 0:
+            print("Nothing to clear.")
+            return 0
+        if confirm_prompt:
+            ti_list = "\n".join([str(t) for t in all_tis])
+            question = (
+                "You are about to delete these {} tasks:\n"
+                "{}\n\n"
+                "Are you sure? (yes/no): ").format(count, ti_list)
+            do_it = utils.helpers.ask_yesno(question)
+
+        if do_it:
+            for dag in dags:
+                dag.clear(start_date=start_date,
+                          end_date=end_date,
+                          only_failed=only_failed,
+                          only_running=only_running,
+                          confirm_prompt=False,
+                          include_subdags=include_subdags,
+                          reset_dag_runs=reset_dag_runs,
+                          dry_run=False)
+        else:
+            count = 0
+            print("Bail. Nothing was cleared.")
+        return count
+
     def __deepcopy__(self, memo):
         # Swiwtcharoo to go around deepcopying objects coming through the
         # backdoor

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/18f8498a/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 923e0c3..e1a4664 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1354,6 +1354,16 @@ class CliTests(unittest.TestCase):
             'clear', 'example_subdag_operator', '--no_confirm', '--exclude_subdags'])
         cli.clear(args)
 
+    def test_get_dags(self):
+        dags = cli.get_dags(self.parser.parse_args(['clear', 'example_subdag_operator', '-c']))
+        self.assertEqual(len(dags), 1)
+
+        dags = cli.get_dags(self.parser.parse_args(['clear', 'subdag', '-dx', '-c']))
+        self.assertGreater(len(dags), 1)
+
+        with self.assertRaises(AirflowException):
+            cli.get_dags(self.parser.parse_args(['clear', 'foobar', '-dx', '-c']))
+
     def test_backfill(self):
         cli.backfill(self.parser.parse_args([
             'backfill', 'example_bash_operator',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/18f8498a/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index cf2734b..d1621ad 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -1264,6 +1264,69 @@ class ClearTasksTest(unittest.TestCase):
         self.assertEqual(ti0.try_number, 1)
         self.assertEqual(ti0.max_tries, 1)
 
+    def test_dags_clear(self):
+        # setup
+        session = settings.Session()
+        dags, tis = [], []
+        num_of_dags = 5
+        for i in range(num_of_dags):
+            dag = DAG('test_dag_clear_' + str(i), start_date=DEFAULT_DATE,
+                      end_date=DEFAULT_DATE + datetime.timedelta(days=10))
+            ti = TI(task=DummyOperator(task_id='test_task_clear_' + str(i), owner='test',
dag=dag),
+                    execution_date=DEFAULT_DATE)
+            dags.append(dag)
+            tis.append(ti)
+
+        # test clear all dags
+        for i in range(num_of_dags):
+            tis[i].run()
+            self.assertEqual(tis[i].state, State.SUCCESS)
+            self.assertEqual(tis[i].try_number, 1)
+            self.assertEqual(tis[i].max_tries, 0)
+
+        DAG.clear_dags(dags)
+
+        for i in range(num_of_dags):
+            tis[i].refresh_from_db()
+            self.assertEqual(tis[i].state, State.NONE)
+            self.assertEqual(tis[i].try_number, 1)
+            self.assertEqual(tis[i].max_tries, 1)
+
+        # test dry_run
+        for i in range(num_of_dags):
+            tis[i].run()
+            self.assertEqual(tis[i].state, State.SUCCESS)
+            self.assertEqual(tis[i].try_number, 2)
+            self.assertEqual(tis[i].max_tries, 1)
+
+        DAG.clear_dags(dags, dry_run=True)
+
+        for i in range(num_of_dags):
+            tis[i].refresh_from_db()
+            self.assertEqual(tis[i].state, State.SUCCESS)
+            self.assertEqual(tis[i].try_number, 2)
+            self.assertEqual(tis[i].max_tries, 1)
+
+        # test only_failed
+        from random import randint
+        failed_dag_idx = randint(0, len(tis) - 1)
+        tis[failed_dag_idx].state = State.FAILED
+        session.merge(tis[failed_dag_idx])
+        session.commit()
+
+        DAG.clear_dags(dags, only_failed=True)
+
+        for i in range(num_of_dags):
+            tis[i].refresh_from_db()
+            if i != failed_dag_idx:
+                self.assertEqual(tis[i].state, State.SUCCESS)
+                self.assertEqual(tis[i].try_number, 2)
+                self.assertEqual(tis[i].max_tries, 1)
+            else:
+                self.assertEqual(tis[i].state, State.NONE)
+                self.assertEqual(tis[i].try_number, 2)
+                self.assertEqual(tis[i].max_tries, 2)
+
     def test_operator_clear(self):
         dag = DAG('test_operator_clear', start_date=DEFAULT_DATE,
                   end_date=DEFAULT_DATE + datetime.timedelta(days=10))


Mime
View raw message