airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-2027] Only trigger sleep in scheduler after all files have parsed
Date Mon, 09 Apr 2018 08:22:19 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3ece6f6dc -> 3c4f1fd9e


[AIRFLOW-2027] Only trigger sleep in scheduler after all files have parsed

Closes #2986 from aoen/ddavydov--open_source_disab
le_unecessary_sleep_in_scheduler_loop


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3c4f1fd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3c4f1fd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3c4f1fd9

Branch: refs/heads/master
Commit: 3c4f1fd9e668a71531ec07d1e7bd60523cb31d32
Parents: 3ece6f6
Author: Dan Davydov <dan.davydov@airbnb.com>
Authored: Mon Apr 9 10:22:11 2018 +0200
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Mon Apr 9 10:22:11 2018 +0200

----------------------------------------------------------------------
 UPDATING.md                                  |   3 +
 airflow/config_templates/default_airflow.cfg |   3 +
 airflow/jobs.py                              |  30 +++---
 airflow/utils/dag_processing.py              |  25 ++++-
 tests/core.py                                |   5 +-
 tests/jobs.py                                | 112 ++++++++++------------
 tests/utils/test_dag_processing.py           |  22 +++--
 7 files changed, 115 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c4f1fd9/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 7d4ef0b..0abccff 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -224,6 +224,9 @@ indefinitely. This is only available on the command line.
 #### min_file_process_interval
 After how much time should an updated DAG be picked up from the filesystem.
 
+#### min_file_parsing_loop_time
+How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
+
 #### dag_dir_list_interval
 How often the scheduler should relist the contents of the DAG directory. If you experience
that while developing your
 dags are not being picked up, have a look at this number and decrease it when necessary.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c4f1fd9/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index eb2981f..e8da82f 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -379,6 +379,9 @@ run_duration = -1
 # after how much time a new DAGs should be picked up from the filesystem
 min_file_process_interval = 0
 
+# How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
+min_file_parsing_loop_time = 1
+
 dag_dir_list_interval = 300
 
 # How often should stats be printed to the logs

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c4f1fd9/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 6241717..bcff868 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -510,7 +510,8 @@ class SchedulerJob(BaseJob):
             num_runs=-1,
             file_process_interval=conf.getint('scheduler',
                                               'min_file_process_interval'),
-            processor_poll_interval=1.0,
+            min_file_parsing_loop_time=conf.getint('scheduler',
+                                                   'min_file_parsing_loop_time'),
             run_duration=None,
             do_pickle=False,
             log=None,
@@ -525,8 +526,6 @@ class SchedulerJob(BaseJob):
         :type subdir: unicode
         :param num_runs: The number of times to try to schedule each DAG file.
         -1 for unlimited within the run_duration.
-        :param processor_poll_interval: The number of seconds to wait between
-        polls of running processors
         :param run_duration: how long to run (in seconds) before exiting
         :type run_duration: int
         :param do_pickle: once a DAG object is obtained by executing the Python
@@ -543,7 +542,6 @@ class SchedulerJob(BaseJob):
 
         self.num_runs = num_runs
         self.run_duration = run_duration
-        self._processor_poll_interval = processor_poll_interval
 
         self.do_pickle = do_pickle
         super(SchedulerJob, self).__init__(*args, **kwargs)
@@ -572,6 +570,10 @@ class SchedulerJob(BaseJob):
         # to 3 minutes.
         self.file_process_interval = file_process_interval
 
+        # Wait until at least this many seconds have passed before parsing files once all
+        # files have finished parsing.
+        self.min_file_parsing_loop_time = min_file_parsing_loop_time
+
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         if run_duration is None:
             self.run_duration = conf.getint('scheduler',
@@ -1528,11 +1530,16 @@ class SchedulerJob(BaseJob):
         # DAGs in parallel. By processing them in separate processes,
         # we can get parallelism and isolation from potentially harmful
         # user code.
-        self.log.info("Processing files using up to %s processes at a time", self.max_threads)
+        self.log.info("Processing files using up to %s processes at a time",
+                      self.max_threads)
         self.log.info("Running execute loop for %s seconds", self.run_duration)
         self.log.info("Processing each file at most %s times", self.num_runs)
-        self.log.info("Process each file at most once every %s seconds", self.file_process_interval)
-        self.log.info("Checking for new files in %s every %s seconds", self.subdir, self.dag_dir_list_interval)
+        self.log.info("Process each file at most once every %s seconds",
+                      self.file_process_interval)
+        self.log.info("Wait until at least %s seconds have passed between file parsing "
+                      "loops", self.min_file_parsing_loop_time)
+        self.log.info("Checking for new files in %s every %s seconds",
+                      self.subdir, self.dag_dir_list_interval)
 
         # Build up a list of Python files that could contain DAGs
         self.log.info("Searching for files in %s", self.subdir)
@@ -1548,6 +1555,7 @@ class SchedulerJob(BaseJob):
                                                     known_file_paths,
                                                     self.max_threads,
                                                     self.file_process_interval,
+                                                    self.min_file_parsing_loop_time,
                                                     self.num_runs,
                                                     processor_factory)
 
@@ -1691,13 +1699,13 @@ class SchedulerJob(BaseJob):
                 last_stat_print_time = timezone.utcnow()
 
             loop_end_time = time.time()
-            self.log.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
-            self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
-            time.sleep(self._processor_poll_interval)
+            self.log.debug("Ran scheduling loop in %.2f seconds",
+                           loop_end_time - loop_start_time)
 
             # Exit early for a test mode
             if processor_manager.max_runs_reached():
-                self.log.info("Exiting loop as all files have been processed %s times", self.num_runs)
+                self.log.info("Exiting loop as all files have been processed %s times",
+                              self.num_runs)
                 break
 
         # Stop any processors

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c4f1fd9/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index b4abd34..ed2bafc 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -304,6 +304,7 @@ class DagFileProcessorManager(LoggingMixin):
                  file_paths,
                  parallelism,
                  process_file_interval,
+                 min_file_parsing_loop_time,
                  max_runs,
                  processor_factory):
         """
@@ -317,6 +318,9 @@ class DagFileProcessorManager(LoggingMixin):
         :param process_file_interval: process a file at most once every this
         many seconds
         :type process_file_interval: float
+        :param min_file_parsing_loop_time: wait until at least this many seconds have
+        passed before parsing files once all files have finished parsing.
+        :type min_file_parsing_loop_time: float
         :param max_runs: The number of times to parse and schedule each file. -1
         for unlimited.
         :type max_runs: int
@@ -332,6 +336,7 @@ class DagFileProcessorManager(LoggingMixin):
         self._dag_directory = dag_directory
         self._max_runs = max_runs
         self._process_file_interval = process_file_interval
+        self._min_file_parsing_loop_time = min_file_parsing_loop_time
         self._processor_factory = processor_factory
         # Map from file path to the processor
         self._processors = {}
@@ -502,12 +507,24 @@ class DagFileProcessorManager(LoggingMixin):
             file_paths_in_progress = self._processors.keys()
             now = timezone.utcnow()
             file_paths_recently_processed = []
+
+            longest_parse_duration = 0
             for file_path in self._file_paths:
                 last_finish_time = self.get_last_finish_time(file_path)
-                if (last_finish_time is not None and
-                    (now - last_finish_time).total_seconds() <
-                        self._process_file_interval):
-                    file_paths_recently_processed.append(file_path)
+                if last_finish_time is not None:
+                    duration = now - last_finish_time
+                    longest_parse_duration = max(duration.total_seconds(),
+                                                 longest_parse_duration)
+                    if duration.total_seconds() < self._process_file_interval:
+                        file_paths_recently_processed.append(file_path)
+
+            sleep_length = max(self._min_file_parsing_loop_time - longest_parse_duration,
+                               0)
+            if sleep_length > 0:
+                self.log.debug("Sleeping for %.2f seconds to prevent excessive "
+                               "logging",
+                               sleep_length)
+                time.sleep(sleep_length)
 
             files_paths_at_run_limit = [file_path
                                         for file_path, num_runs in self._run_count.items()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c4f1fd9/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 7edb714..b154bc0 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -108,10 +108,7 @@ class OperatorSubclass(BaseOperator):
 
 
 class CoreTest(unittest.TestCase):
-    # These defaults make the test faster to run
-    default_scheduler_args = {"file_process_interval": 0,
-                              "processor_poll_interval": 0.5,
-                              "num_runs": 1}
+    default_scheduler_args = {"num_runs": 1}
 
     def setUp(self):
         configuration.load_test_config()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c4f1fd9/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 1e411e2..7c278b7 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -867,11 +867,11 @@ class LocalTaskJobTest(unittest.TestCase):
         session = settings.Session()
 
         dag.clear()
-        dr = dag.create_dagrun(run_id="test",
-                               state=State.RUNNING,
-                               execution_date=DEFAULT_DATE,
-                               start_date=DEFAULT_DATE,
-                               session=session)
+        dag.create_dagrun(run_id="test",
+                          state=State.RUNNING,
+                          execution_date=DEFAULT_DATE,
+                          start_date=DEFAULT_DATE,
+                          session=session)
         ti = TI(task=task, execution_date=DEFAULT_DATE)
         ti.refresh_from_db()
         job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True)
@@ -931,9 +931,6 @@ class LocalTaskJobTest(unittest.TestCase):
 
 
 class SchedulerJobTest(unittest.TestCase):
-    # These defaults make the test faster to run
-    default_scheduler_args = {"file_process_interval": 0,
-                              "processor_poll_interval": 0.5}
 
     def setUp(self):
         self.dagbag = DagBag()
@@ -976,7 +973,7 @@ class SchedulerJobTest(unittest.TestCase):
         dagbag1 = self._make_simple_dag_bag([dag])
         dagbag2 = self._make_simple_dag_bag([dag2])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         ti1 = TI(task1, DEFAULT_DATE)
@@ -1016,7 +1013,7 @@ class SchedulerJobTest(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         dagbag = self._make_simple_dag_bag([dag])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr1 = scheduler.create_dag_run(dag)
@@ -1046,10 +1043,10 @@ class SchedulerJobTest(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         dagbag = self._make_simple_dag_bag([dag])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
-        dr1 = scheduler.create_dag_run(dag)
+        scheduler.create_dag_run(dag)
         ti1 = TI(task1, DEFAULT_DATE)
         ti1.state = State.SCHEDULED
         ti1.execution_date = ti1.execution_date + datetime.timedelta(days=1)
@@ -1071,7 +1068,7 @@ class SchedulerJobTest(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         dagbag = self._make_simple_dag_bag([dag])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr1 = scheduler.create_dag_run(dag)
@@ -1096,7 +1093,7 @@ class SchedulerJobTest(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         dagbag = self._make_simple_dag_bag([dag])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr1 = scheduler.create_dag_run(dag)
@@ -1136,7 +1133,7 @@ class SchedulerJobTest(unittest.TestCase):
         task2 = DummyOperator(dag=dag, task_id=task_id_2, pool='b')
         dagbag = self._make_simple_dag_bag([dag])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr1 = scheduler.create_dag_run(dag)
@@ -1177,7 +1174,7 @@ class SchedulerJobTest(unittest.TestCase):
         task = DummyOperator(dag=dag, task_id=task_id, pool="this_pool_doesnt_exist")
         dagbag = self._make_simple_dag_bag([dag])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr = scheduler.create_dag_run(dag)
@@ -1198,13 +1195,13 @@ class SchedulerJobTest(unittest.TestCase):
         dag_id = 'SchedulerJobTest.test_find_executable_task_instances_none'
         task_id_1 = 'dummy'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
-        task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        DummyOperator(dag=dag, task_id=task_id_1)
         dagbag = self._make_simple_dag_bag([dag])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
-        dr1 = scheduler.create_dag_run(dag)
+        scheduler.create_dag_run(dag)
         session.commit()
 
         self.assertEqual(0, len(scheduler._find_executable_task_instances(
@@ -1219,7 +1216,7 @@ class SchedulerJobTest(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         dagbag = self._make_simple_dag_bag([dag])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr1 = scheduler.create_dag_run(dag)
@@ -1267,7 +1264,7 @@ class SchedulerJobTest(unittest.TestCase):
         task2 = DummyOperator(dag=dag, task_id=task_id_2)
         dagbag = self._make_simple_dag_bag([dag])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr1 = scheduler.create_dag_run(dag)
@@ -1351,9 +1348,10 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertEqual(1, len(res))
 
     def test_change_state_for_executable_task_instances_no_tis(self):
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
-        res = scheduler._change_state_for_executable_task_instances([], [State.NONE], session)
+        res = scheduler._change_state_for_executable_task_instances(
+            [], [State.NONE], session)
         self.assertEqual(0, len(res))
 
     def test_change_state_for_executable_task_instances_no_tis_with_state(self):
@@ -1361,9 +1359,9 @@ class SchedulerJobTest(unittest.TestCase):
         task_id_1 = 'dummy'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2)
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
-        dagbag = self._make_simple_dag_bag([dag])
+        self._make_simple_dag_bag([dag])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr1 = scheduler.create_dag_run(dag)
@@ -1393,9 +1391,9 @@ class SchedulerJobTest(unittest.TestCase):
         task_id_1 = 'dummy'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2)
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
-        dagbag = self._make_simple_dag_bag([dag])
+        self._make_simple_dag_bag([dag])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr1 = scheduler.create_dag_run(dag)
@@ -1431,7 +1429,7 @@ class SchedulerJobTest(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         dagbag = self._make_simple_dag_bag([dag])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr1 = scheduler.create_dag_run(dag)
@@ -1452,7 +1450,7 @@ class SchedulerJobTest(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         dagbag = SimpleDagBag([])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr1 = scheduler.create_dag_run(dag)
@@ -1476,7 +1474,7 @@ class SchedulerJobTest(unittest.TestCase):
         task2 = DummyOperator(dag=dag, task_id=task_id_2)
         dagbag = self._make_simple_dag_bag([dag])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         # create first dag run with 1 running and 1 queued
@@ -1537,7 +1535,7 @@ class SchedulerJobTest(unittest.TestCase):
         task2 = DummyOperator(dag=dag, task_id=task_id_2)
         dagbag = self._make_simple_dag_bag([dag])
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         scheduler.max_tis_per_query = 3
         session = settings.Session()
 
@@ -1713,7 +1711,7 @@ class SchedulerJobTest(unittest.TestCase):
         if run_kwargs is None:
             run_kwargs = {}
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         dag = self.dagbag.get_dag(dag_id)
         dag.clear()
         dr = scheduler.create_dag_run(dag)
@@ -1783,7 +1781,7 @@ class SchedulerJobTest(unittest.TestCase):
         DagRuns with one unfinished and one failed root task -> RUNNING
         """
         # Run both the failed and successful tasks
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         dag_id = 'test_dagrun_states_root_fail_unfinished'
         dag = self.dagbag.get_dag(dag_id)
         dag.clear()
@@ -1847,8 +1845,7 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertTrue(dag.start_date > DEFAULT_DATE)
 
         scheduler = SchedulerJob(dag_id,
-                                 num_runs=2,
-                                 **self.default_scheduler_args)
+                                 num_runs=2)
         scheduler.run()
 
         # zero tasks ran
@@ -1872,8 +1869,7 @@ class SchedulerJobTest(unittest.TestCase):
             len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)
 
         scheduler = SchedulerJob(dag_id,
-                                 num_runs=2,
-                                 **self.default_scheduler_args)
+                                 num_runs=2)
         scheduler.run()
 
         # still one task
@@ -1891,8 +1887,6 @@ class SchedulerJobTest(unittest.TestCase):
             dag.clear()
 
         scheduler = SchedulerJob(dag_ids=dag_ids,
-                                 file_process_interval=0,
-                                 processor_poll_interval=0.5,
                                  num_runs=2)
         scheduler.run()
 
@@ -2410,8 +2404,7 @@ class SchedulerJobTest(unittest.TestCase):
 
         # Now call manage_slas and see if the sla_miss callback gets called
         scheduler = SchedulerJob(dag_id='test_sla_miss',
-                                 num_runs=1,
-                                 **self.default_scheduler_args)
+                                 num_runs=1)
         scheduler.manage_slas(dag=dag, session=session)
 
         sla_callback.assert_not_called()
@@ -2614,8 +2607,7 @@ class SchedulerJobTest(unittest.TestCase):
         expected_run_duration = 5
         start_time = timezone.utcnow()
         scheduler = SchedulerJob(dag_id,
-                                 run_duration=expected_run_duration,
-                                 **self.default_scheduler_args)
+                                 run_duration=expected_run_duration)
         scheduler.run()
         end_time = timezone.utcnow()
 
@@ -2644,9 +2636,8 @@ class SchedulerJobTest(unittest.TestCase):
             dag.clear()
 
         scheduler = SchedulerJob(dag_ids=dag_ids,
-                                 subdir= dag_directory,
-                                 num_runs=1,
-                                 **self.default_scheduler_args)
+                                 subdir=dag_directory,
+                                 num_runs=1)
         scheduler.run()
         session = settings.Session()
         self.assertEqual(
@@ -2988,17 +2979,18 @@ class SchedulerJobTest(unittest.TestCase):
 
     def test_reset_orphaned_tasks_nothing(self):
         """Try with nothing. """
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
-        self.assertEqual(0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+        self.assertEqual(
+            0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
 
     def test_reset_orphaned_tasks_external_triggered_dag(self):
         dag_id = 'test_reset_orphaned_tasks_external_triggered_dag'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
         task_id = dag_id + '_task'
-        task = DummyOperator(task_id=task_id, dag=dag)
+        DummyOperator(task_id=task_id, dag=dag)
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr1 = scheduler.create_dag_run(dag, session=session)
@@ -3017,9 +3009,9 @@ class SchedulerJobTest(unittest.TestCase):
         dag_id = 'test_reset_orphaned_tasks_backfill_dag'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
         task_id = dag_id + '_task'
-        task = DummyOperator(task_id=task_id, dag=dag)
+        DummyOperator(task_id=task_id, dag=dag)
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr1 = scheduler.create_dag_run(dag, session=session)
@@ -3039,9 +3031,9 @@ class SchedulerJobTest(unittest.TestCase):
         dag_id = 'test_reset_orphaned_tasks_specified_dagrun'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
         task_id = dag_id + '_task'
-        task = DummyOperator(task_id=task_id, dag=dag)
+        DummyOperator(task_id=task_id, dag=dag)
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
         # make two dagruns, only reset for one
         dr1 = scheduler.create_dag_run(dag)
@@ -3073,7 +3065,7 @@ class SchedulerJobTest(unittest.TestCase):
         task_id = dag_id + '_task'
         task = DummyOperator(task_id=task_id, dag=dag)
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         ti = models.TaskInstance(task=task, execution_date=DEFAULT_DATE)
@@ -3091,9 +3083,9 @@ class SchedulerJobTest(unittest.TestCase):
         dag_id = 'test_reset_orphaned_tasks_no_orphans'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
         task_id = dag_id + '_task'
-        task = DummyOperator(task_id=task_id, dag=dag)
+        DummyOperator(task_id=task_id, dag=dag)
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr1 = scheduler.create_dag_run(dag)
@@ -3113,9 +3105,9 @@ class SchedulerJobTest(unittest.TestCase):
         dag_id = 'test_reset_orphaned_tasks_non_running_dagruns'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
         task_id = dag_id + '_task'
-        task = DummyOperator(task_id=task_id, dag=dag)
+        DummyOperator(task_id=task_id, dag=dag)
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
         session = settings.Session()
 
         dr1 = scheduler.create_dag_run(dag)
@@ -3144,7 +3136,7 @@ class SchedulerJobTest(unittest.TestCase):
             task = DummyOperator(task_id=task_id, dag=dag)
             tasks.append(task)
 
-        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler = SchedulerJob()
 
         session = settings.Session()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c4f1fd9/tests/utils/test_dag_processing.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py
index 2b60cd0..f784f04 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -21,9 +21,14 @@ from airflow.utils.dag_processing import DagFileProcessorManager
 
 class TestDagFileProcessorManager(unittest.TestCase):
     def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
-        manager = DagFileProcessorManager(dag_directory='directory', file_paths=['abc.txt'],
-                                          parallelism=1, process_file_interval=1,
-                                          max_runs=1, processor_factory=MagicMock().return_value)
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            file_paths=['abc.txt'],
+            parallelism=1,
+            process_file_interval=1,
+            max_runs=1,
+            min_file_parsing_loop_time=0,
+            processor_factory=MagicMock().return_value)
 
         mock_processor = MagicMock()
         mock_processor.stop.side_effect = AttributeError(
@@ -36,9 +41,14 @@ class TestDagFileProcessorManager(unittest.TestCase):
         self.assertDictEqual(manager._processors, {})
 
     def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
-        manager = DagFileProcessorManager(dag_directory='directory', file_paths=['abc.txt'],
-                                          parallelism=1, process_file_interval=1,
-                                          max_runs=1, processor_factory=MagicMock().return_value)
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            file_paths=['abc.txt'],
+            parallelism=1,
+            process_file_interval=1,
+            max_runs=1,
+            min_file_parsing_loop_time=0,
+            processor_factory=MagicMock().return_value)
 
         mock_processor = MagicMock()
         mock_processor.stop.side_effect = AttributeError(


Mime
View raw message