Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EA5A4200C44 for ; Mon, 13 Mar 2017 05:51:48 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E9053160B77; Mon, 13 Mar 2017 04:51:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C065E160B8D for ; Mon, 13 Mar 2017 05:51:47 +0100 (CET) Received: (qmail 7633 invoked by uid 500); 13 Mar 2017 04:51:47 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 7624 invoked by uid 99); 13 Mar 2017 04:51:46 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Mar 2017 04:51:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 7BF64C0115 for ; Mon, 13 Mar 2017 04:51:46 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.569 X-Spam-Level: X-Spam-Status: No, score=-3.569 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_NEUTRAL=0.652] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id IWxgesDF9Fxm for ; Mon, 13 Mar 2017 04:51:44 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 310765FCB6 for ; Mon, 13 Mar 2017 04:51:40 +0000 (UTC) Received: (qmail 94148 invoked by uid 99); 13 Mar 2017 04:44:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Mar 2017 04:44:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A2B63F4B60; Mon, 13 Mar 2017 04:44:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bolke@apache.org To: commits@airflow.incubator.apache.org Date: Mon, 13 Mar 2017 04:45:17 -0000 Message-Id: <47554b52f9244bf0bccbf28ebf49dce5@git.apache.org> In-Reply-To: <47f65032ee134f7aa3ff2a15122acda0@git.apache.org> References: <47f65032ee134f7aa3ff2a15122acda0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/45] incubator-airflow git commit: [AIRFLOW-794] Access DAGS_FOLDER and SQL_ALCHEMY_CONN exclusively from settings archived-at: Mon, 13 Mar 2017 04:51:49 -0000 [AIRFLOW-794] Access DAGS_FOLDER and SQL_ALCHEMY_CONN exclusively from settings Closes #2013 from gsakkis/settings Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/25920242 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/25920242 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/25920242 Branch: refs/heads/v1-8-stable Commit: 2592024230a25820d368ecc3bd43fbf7b52e46d9 Parents: 5405f5f Author: George Sakkis Authored: Thu Feb 2 14:45:48 2017 +0100 Committer: Bolke de Bruin Committed: Sun Mar 12 07:57:04 2017 -0700 ---------------------------------------------------------------------- airflow/__init__.py | 8 +++----- airflow/bin/cli.py | 9 ++------- airflow/configuration.py | 6 ------ airflow/jobs.py | 2 +- airflow/migrations/env.py | 6 ++---- airflow/models.py | 8 +++----- airflow/operators/dagrun_operator.py | 3 +-- airflow/utils/db.py | 4 +--- airflow/www/utils.py | 4 +--- airflow/www/views.py | 2 +- tests/core.py | 3 +-- tests/jobs.py | 9 ++++----- 12 files changed, 20 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/__init__.py b/airflow/__init__.py index 1e40fe9..3daa6e2 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -24,19 +24,17 @@ from airflow import version __version__ = version.version import logging -import os import sys from airflow import configuration as conf - +from airflow import settings from airflow.models import DAG from flask_admin import BaseView from importlib import import_module from airflow.exceptions import AirflowException -DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) -if DAGS_FOLDER not in sys.path: - sys.path.append(DAGS_FOLDER) +if settings.DAGS_FOLDER not in sys.path: + sys.path.append(settings.DAGS_FOLDER) login = None http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index fbd86db..61d8707 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -58,10 +58,8 @@ from airflow.www.app import cached_app from sqlalchemy import func from sqlalchemy.orm import exc -DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) api.load_auth() - api_module = import_module(conf.get('cli', 'api_client')) api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'), auth=api.api_auth.client_auth) @@ -114,11 +112,8 @@ def setup_locations(process, pid=None, stdout=None, stderr=None, log=None): def process_subdir(subdir): - dags_folder = conf.get("core", "DAGS_FOLDER") - dags_folder = os.path.expanduser(dags_folder) if subdir: - if "DAGS_FOLDER" in subdir: - subdir = subdir.replace("DAGS_FOLDER", dags_folder) + subdir = subdir.replace('DAGS_FOLDER', settings.DAGS_FOLDER) subdir = os.path.abspath(os.path.expanduser(subdir)) return subdir @@ -1128,7 +1123,7 @@ class CLIFactory(object): 'subdir': Arg( ("-sd", "--subdir"), "File location or directory from which to look for the dag", - default=DAGS_FOLDER), + default=settings.DAGS_FOLDER), 'start_date': Arg( ("-s", "--start_date"), "Override start_date YYYY-MM-DD", type=parsedate), http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/configuration.py ---------------------------------------------------------------------- diff --git a/airflow/configuration.py b/airflow/configuration.py index 404808b..6752bdb 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -828,9 +828,3 @@ as_dict.__doc__ = conf.as_dict.__doc__ def set(section, option, value): # noqa return conf.set(section, option, value) - -######################## -# convenience method to access config entries - -def get_dags_folder(): - return os.path.expanduser(get('core', 'DAGS_FOLDER')) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 3ca0070..fedad55 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -463,7 +463,7 @@ class SchedulerJob(BaseJob): self, dag_id=None, dag_ids=None, - subdir=models.DAGS_FOLDER, + subdir=settings.DAGS_FOLDER, num_runs=-1, file_process_interval=conf.getint('scheduler', 'min_file_process_interval'), http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/migrations/env.py ---------------------------------------------------------------------- diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py index a107d6c..8d5e55e 100644 --- a/airflow/migrations/env.py +++ b/airflow/migrations/env.py @@ -17,7 +17,6 @@ from alembic import context from logging.config import fileConfig from airflow import settings -from airflow import configuration from airflow.jobs import models # this is the Alembic Config object, which provides @@ -54,10 +53,9 @@ def run_migrations_offline(): script output. """ - url = configuration.get('core', 'SQL_ALCHEMY_CONN') context.configure( - url=url, target_metadata=target_metadata, literal_binds=True, - compare_type=COMPARE_TYPE) + url=settings.SQL_ALCHEMY_CONN, target_metadata=target_metadata, + literal_binds=True, compare_type=COMPARE_TYPE) with context.begin_transaction(): context.run_migrations() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index d6ab5b8..1829ff3 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -81,8 +81,6 @@ from airflow.utils.trigger_rule import TriggerRule Base = declarative_base() ID_LEN = 250 -SQL_ALCHEMY_CONN = configuration.get('core', 'SQL_ALCHEMY_CONN') -DAGS_FOLDER = os.path.expanduser(configuration.get('core', 'DAGS_FOLDER')) XCOM_RETURN_KEY = 'return_value' Stats = settings.Stats @@ -95,7 +93,7 @@ try: except: pass -if 'mysql' in SQL_ALCHEMY_CONN: +if 'mysql' in settings.SQL_ALCHEMY_CONN: LongText = LONGTEXT else: LongText = Text @@ -165,7 +163,7 @@ class DagBag(BaseDagBag, LoggingMixin): executor=DEFAULT_EXECUTOR, include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES')): - dag_folder = dag_folder or DAGS_FOLDER + dag_folder = dag_folder or settings.DAGS_FOLDER self.logger.info("Filling up the DagBag from {}".format(dag_folder)) self.dag_folder = dag_folder self.dags = {} @@ -2858,7 +2856,7 @@ class DAG(BaseDag, LoggingMixin): """ File location of where the dag object is instantiated """ - fn = self.full_filepath.replace(DAGS_FOLDER + '/', '') + fn = self.full_filepath.replace(settings.DAGS_FOLDER + '/', '') fn = fn.replace(os.path.dirname(__file__) + '/', '') return fn http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/operators/dagrun_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py index 239ebb4..c3ffa1a 100644 --- a/airflow/operators/dagrun_operator.py +++ b/airflow/operators/dagrun_operator.py @@ -14,7 +14,6 @@ from datetime import datetime import logging -import os from airflow.models import BaseOperator, DagBag from airflow.utils.decorators import apply_defaults @@ -65,7 +64,7 @@ class TriggerDagRunOperator(BaseOperator): dro = self.python_callable(context, dro) if dro: session = settings.Session() - dbag = DagBag(os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))) + dbag = DagBag(settings.DAGS_FOLDER) trigger_dag = dbag.get_dag(self.trigger_dag_id) dr = trigger_dag.create_dagrun( run_id=dro.run_id, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/utils/db.py ---------------------------------------------------------------------- diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 9c7b4b3..2502219 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -30,7 +30,6 @@ from sqlalchemy import event, exc from sqlalchemy.pool import Pool from airflow import settings -from airflow import configuration def provide_session(func): @@ -287,8 +286,7 @@ def upgradedb(): directory = os.path.join(package_dir, 'migrations') config = Config(os.path.join(package_dir, 'alembic.ini')) config.set_main_option('script_location', directory) - config.set_main_option('sqlalchemy.url', - configuration.get('core', 'SQL_ALCHEMY_CONN')) + config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN) command.upgrade(config, 'heads') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/www/utils.py ---------------------------------------------------------------------- diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 1a1229b..d2218de 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -137,9 +137,7 @@ def notify_owner(f): if request.args.get('confirmed') == "true": dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') - dagbag = models.DagBag( - os.path.expanduser(configuration.get('core', 'DAGS_FOLDER'))) - + dagbag = models.DagBag(settings.DAGS_FOLDER) dag = dagbag.get_dag(dag_id) task = dag.get_task(task_id) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index 9e68079..0391775 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -77,7 +77,7 @@ from airflow.configuration import AirflowConfigException QUERY_LIMIT = 100000 CHART_LIMIT = 200000 -dagbag = models.DagBag(os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))) +dagbag = models.DagBag(settings.DAGS_FOLDER) login_required = airflow.login.login_required current_user = airflow.login.current_user http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 3e76e81..ee7a738 100644 --- a/tests/core.py +++ b/tests/core.py @@ -15,7 +15,6 @@ from __future__ import print_function import doctest -import json import os import re import unittest @@ -1315,7 +1314,7 @@ class CliTests(unittest.TestCase): '-s', DEFAULT_DATE.isoformat()])) def test_process_subdir_path_with_placeholder(self): - assert cli.process_subdir('DAGS_FOLDER/abc') == os.path.join(configuration.get_dags_folder(), 'abc') + assert cli.process_subdir('DAGS_FOLDER/abc') == os.path.join(settings.DAGS_FOLDER, 'abc') def test_trigger_dag(self): cli.trigger_dag(self.parser.parse_args([ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index ee4c8a7..44087e1 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -27,7 +27,6 @@ import sys from tempfile import mkdtemp from airflow import AirflowException, settings -from airflow import models from airflow.bin import cli from airflow.jobs import BackfillJob, SchedulerJob from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI @@ -817,7 +816,7 @@ class SchedulerJobTest(unittest.TestCase): # Recreated part of the scheduler here, to kick off tasks -> executor for ti_key in queue: task = dag.get_task(ti_key[1]) - ti = models.TaskInstance(task, ti_key[2]) + ti = TI(task, ti_key[2]) # Task starts out in the scheduled state. All tasks in the # scheduled state will be sent to the executor ti.state = State.SCHEDULED @@ -921,7 +920,7 @@ class SchedulerJobTest(unittest.TestCase): # try to schedule the above DAG repeatedly. scheduler = SchedulerJob(num_runs=1, executor=executor, - subdir=os.path.join(models.DAGS_FOLDER, + subdir=os.path.join(settings.DAGS_FOLDER, "no_dags.py")) scheduler.heartrate = 0 scheduler.run() @@ -973,7 +972,7 @@ class SchedulerJobTest(unittest.TestCase): # try to schedule the above DAG repeatedly. scheduler = SchedulerJob(num_runs=1, executor=executor, - subdir=os.path.join(models.DAGS_FOLDER, + subdir=os.path.join(settings.DAGS_FOLDER, "no_dags.py")) scheduler.heartrate = 0 scheduler.run() @@ -1066,7 +1065,7 @@ class SchedulerJobTest(unittest.TestCase): dag_id = 'exit_test_dag' dag_ids = [dag_id] - dag_directory = os.path.join(models.DAGS_FOLDER, + dag_directory = os.path.join(settings.DAGS_FOLDER, "..", "dags_with_system_exit") dag_file = os.path.join(dag_directory,