airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject [3/4] incubator-airflow git commit: [AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background
Date Mon, 10 Apr 2017 21:28:18 GMT
[AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background

AIRFLOW-276 introduced a monitor process for gunicorn
to find new files in the dag folder, but it also changed
`airflow webserver -D`'s behavior to run in foreground.
This PR fixes that by running the monitor as a daemon
process.

Closes #2208 from sekikn/AIRFLOW-1004

(cherry picked from commit a9b20a04b052e9479dbb79fd46124293085610e9)


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0a5fb785
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0a5fb785
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0a5fb785

Branch: refs/heads/v1-8-test
Commit: 0a5fb7856b545073516210fcfc369d2072823ae9
Parents: c94b3a0
Author: Kengo Seki <sekikn@apache.org>
Authored: Tue Apr 4 08:32:44 2017 +0200
Committer: Chris Riccomini <criccomini@apache.org>
Committed: Mon Apr 10 14:24:31 2017 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py | 64 ++++++++++++++++++++++++++++++++++++++++---------
 tests/core.py      | 54 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 107 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a5fb785/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index e9c54e6..e4755c7 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -753,7 +753,12 @@ def webserver(args):
         app.run(debug=True, port=args.port, host=args.hostname,
                 ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None)
     else:
-        pid, stdout, stderr, log_file = setup_locations("webserver", pid=args.pid)
+        pid, stdout, stderr, log_file = setup_locations("webserver", args.pid, args.stdout,
args.stderr, args.log_file)
+        if args.daemon:
+            handle = setup_logging(log_file)
+            stdout = open(stdout, 'w+')
+            stderr = open(stderr, 'w+')
+
         print(
             textwrap.dedent('''\
                 Running the Gunicorn Server with:
@@ -771,7 +776,6 @@ def webserver(args):
             '-t', str(worker_timeout),
             '-b', args.hostname + ':' + str(args.port),
             '-n', 'airflow-webserver',
-            '-p', str(pid),
             '-c', 'airflow.www.gunicorn_config'
         ]
 
@@ -782,28 +786,66 @@ def webserver(args):
             run_args += ['--error-logfile', str(args.error_logfile)]
 
         if args.daemon:
-            run_args += ["-D"]
+            run_args += ['-D', '-p', str(pid)]
+
         if ssl_cert:
             run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]
 
         run_args += ["airflow.www.app:cached_app()"]
 
-        gunicorn_master_proc = subprocess.Popen(run_args)
+        gunicorn_master_proc = None
 
         def kill_proc(dummy_signum, dummy_frame):
             gunicorn_master_proc.terminate()
             gunicorn_master_proc.wait()
             sys.exit(0)
 
-        signal.signal(signal.SIGINT, kill_proc)
-        signal.signal(signal.SIGTERM, kill_proc)
+        def monitor_gunicorn(gunicorn_master_proc):
+            # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
+            if conf.getint('webserver', 'worker_refresh_interval') > 0:
+                restart_workers(gunicorn_master_proc, num_workers)
+            else:
+                while True:
+                    time.sleep(1)
 
-        # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
-        if conf.getint('webserver', 'worker_refresh_interval') > 0:
-            restart_workers(gunicorn_master_proc, num_workers)
+        if args.daemon:
+            base, ext = os.path.splitext(pid)
+            ctx = daemon.DaemonContext(
+                pidfile=TimeoutPIDLockFile(base + "-monitor" + ext, -1),
+                files_preserve=[handle],
+                stdout=stdout,
+                stderr=stderr,
+                signal_map={
+                    signal.SIGINT: kill_proc,
+                    signal.SIGTERM: kill_proc
+                },
+            )
+            with ctx:
+                subprocess.Popen(run_args)
+
+                # Reading pid file directly, since Popen#pid doesn't
+                # seem to return the right value with DaemonContext.
+                while True:
+                    try:
+                        with open(pid) as f:
+                            gunicorn_master_proc_pid = int(f.read())
+                            break
+                    except IOError:
+                        logging.debug("Waiting for gunicorn's pid file to be created.")
+                        time.sleep(0.1)
+
+                gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
+                monitor_gunicorn(gunicorn_master_proc)
+
+            stdout.close()
+            stderr.close()
         else:
-            while True:
-                time.sleep(1)
+            gunicorn_master_proc = subprocess.Popen(run_args)
+
+            signal.signal(signal.SIGINT, kill_proc)
+            signal.signal(signal.SIGTERM, kill_proc)
+
+            monitor_gunicorn(gunicorn_master_proc)
 
 
 def scheduler(args):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a5fb785/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index c55b1e2..4fd2f08 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1398,6 +1398,60 @@ class CliTests(unittest.TestCase):
         os.remove('variables1.json')
         os.remove('variables2.json')
 
+    def test_cli_webserver_foreground(self):
+        import subprocess
+
+        # Confirm that webserver hasn't been launched.
+        # pgrep returns exit status 1 if no process matched.
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+
+        # Run webserver in foreground and terminate it.
+        p = subprocess.Popen(["airflow", "webserver"])
+        p.terminate()
+        p.wait()
+
+        # Assert that no process remains.
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+
+    @unittest.skipIf("TRAVIS" in os.environ and bool(os.environ["TRAVIS"]),
+                     "Skipping test due to lack of required file permission")
+    def test_cli_webserver_background(self):
+        import subprocess
+        import psutil
+
+        def wait_pidfile(pidfile):
+            while True:
+                try:
+                    with open(pidfile) as f:
+                        return int(f.read())
+                except IOError:
+                    sleep(1)
+
+        # Confirm that webserver hasn't been launched.
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+
+        # Run webserver in background.
+        subprocess.Popen(["airflow", "webserver", "-D"])
+        pidfile = cli.setup_locations("webserver")[0]
+        wait_pidfile(pidfile)
+
+        # Assert that gunicorn and its monitor are launched.
+        self.assertEqual(0, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+        self.assertEqual(0, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+
+        # Terminate monitor process.
+        pidfile = cli.setup_locations("webserver-monitor")[0]
+        pid = wait_pidfile(pidfile)
+        p = psutil.Process(pid)
+        p.terminate()
+        p.wait()
+
+        # Assert that no process remains.
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
 
 class WebUiTests(unittest.TestCase):
     def setUp(self):


Mime
View raw message