airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1729][AIRFLOW-2797][AIRFLOW-2729] Ignore whole directories in .airflowignore
Date Fri, 13 Jul 2018 11:19:01 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 5290688ee -> 6b2fdbef0


[AIRFLOW-1729][AIRFLOW-2797][AIRFLOW-2729] Ignore whole directories in .airflowignore

We can ignore whole directories by removing them
from the `dirs` array
that `os.walk()` returns. Doing this means that we
fewer disk ops if
someone has a set of modules in their dag folder
that they want to
ignore.

Also fixes [AIRFLOW-2797] - we weren't honoring
.airflowignore from a
parent dir as of #3717 -- that (expected)
behaviour is now back again.

De-duplicate the walking code as well - we had two
versions that had
gotten out of sync as of #3171. So that doesn't
happen again we now only
have one version.

Closes #3602 from ashb/ignore-whole-dirs-
airflowignore


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6b2fdbef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6b2fdbef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6b2fdbef

Branch: refs/heads/master
Commit: 6b2fdbef0ab4bd1ed91e6338bcf6440e782b7035
Parents: 5290688
Author: Ash Berlin-Taylor <ash_github@firemirror.com>
Authored: Fri Jul 13 13:18:47 2018 +0200
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Fri Jul 13 13:18:47 2018 +0200

----------------------------------------------------------------------
 airflow/models.py               | 58 +++++++++++++-----------------------
 airflow/utils/dag_processing.py | 29 ++++++++++++++----
 2 files changed, 44 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6b2fdbef/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 089befe..21df933 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -510,7 +510,8 @@ class DagBag(BaseDagBag, LoggingMixin):
         Note that if a .airflowignore file is found while processing,
         the directory, it will behaves much like a .gitignore does,
         ignoring files that match any of the regex patterns specified
-        in the file.
+        in the file. **Note**: The patterns in .airflowignore are treated as
+        un-anchored regexes, not shell-like glob patterns.
         """
         start_dttm = timezone.utcnow()
         dag_folder = dag_folder or self.dag_folder
@@ -519,42 +520,25 @@ class DagBag(BaseDagBag, LoggingMixin):
         stats = []
         FileLoadStat = namedtuple(
             'FileLoadStat', "file duration dag_num task_num dags")
-        if os.path.isfile(dag_folder):
-            self.process_file(dag_folder, only_if_updated=only_if_updated)
-        elif os.path.isdir(dag_folder):
-            for root, dirs, files in os.walk(dag_folder, followlinks=True):
-                patterns = []
-                ignore_file = os.path.join(root, '.airflowignore')
-                if os.path.isfile(ignore_file):
-                    with open(ignore_file, 'r') as f:
-                        patterns += [p for p in f.read().split('\n') if p]
-                for f in files:
-                    try:
-                        filepath = os.path.join(root, f)
-                        if not os.path.isfile(filepath):
-                            continue
-                        mod_name, file_ext = os.path.splitext(
-                            os.path.split(filepath)[-1])
-                        if file_ext != '.py' and not zipfile.is_zipfile(filepath):
-                            continue
-                        if not any(
-                                [re.findall(p, filepath) for p in patterns]):
-                            ts = timezone.utcnow()
-                            found_dags = self.process_file(
-                                filepath, only_if_updated=only_if_updated)
-
-                            td = timezone.utcnow() - ts
-                            td = td.total_seconds() + (
-                                float(td.microseconds) / 1000000)
-                            stats.append(FileLoadStat(
-                                filepath.replace(dag_folder, ''),
-                                td,
-                                len(found_dags),
-                                sum([len(dag.tasks) for dag in found_dags]),
-                                str([dag.dag_id for dag in found_dags]),
-                            ))
-                    except Exception as e:
-                        self.log.exception(e)
+        for filepath in utils.dag_processing.list_py_file_paths(dag_folder):
+            self.log.info(filepath)
+            try:
+                ts = timezone.utcnow()
+                found_dags = self.process_file(
+                    filepath, only_if_updated=only_if_updated)
+
+                td = timezone.utcnow() - ts
+                td = td.total_seconds() + (
+                    float(td.microseconds) / 1000000)
+                stats.append(FileLoadStat(
+                    filepath.replace(dag_folder, ''),
+                    td,
+                    len(found_dags),
+                    sum([len(dag.tasks) for dag in found_dags]),
+                    str([dag.dag_id for dag in found_dags]),
+                ))
+            except Exception as e:
+                self.log.exception(e)
         Stats.gauge(
             'collect_dags', (timezone.utcnow() - start_dttm).total_seconds(), 1)
         Stats.gauge(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6b2fdbef/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 543eb41..e236397 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -177,13 +177,30 @@ def list_py_file_paths(directory, safe_mode=True):
     elif os.path.isfile(directory):
         return [directory]
     elif os.path.isdir(directory):
-        patterns = []
+        patterns_by_dir = {}
         for root, dirs, files in os.walk(directory, followlinks=True):
-            ignore_file = [f for f in files if f == '.airflowignore']
-            if ignore_file:
-                f = open(os.path.join(root, ignore_file[0]), 'r')
-                patterns += [p for p in f.read().split('\n') if p]
-                f.close()
+            patterns = patterns_by_dir.get(root, [])
+            ignore_file = os.path.join(root, '.airflowignore')
+            if os.path.isfile(ignore_file):
+                with open(ignore_file, 'r') as f:
+                    # If we have new patterns create a copy so we don't change
+                    # the previous list (which would affect other subdirs)
+                    patterns = patterns + [p for p in f.read().split('\n') if p]
+
+            # If we can ignore any subdirs entirely we should - fewer paths
+            # to walk is better. We have to modify the ``dirs`` array in
+            # place for this to affect os.walk
+            dirs[:] = [
+                d
+                for d in dirs
+                if not any(re.search(p, os.path.join(root, d)) for p in patterns)
+            ]
+
+            # We want patterns defined in a parent folder's .airflowignore to
+            # apply to subdirs too
+            for d in dirs:
+                patterns_by_dir[os.path.join(root, d)] = patterns
+
             for f in files:
                 try:
                     file_path = os.path.join(root, f)


Mime
View raw message