airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Gong <go...@hotmail.com>
Subject The best place to initialize a db table whenever airflow starts ?
Date Tue, 01 Nov 2016 14:09:24 GMT
Hi,

I have a MySQL table, which will be stored some static information. The information could
be different for different airflow runs, so I hope to use python code to initialize it whenever
airflow starts.


Where is the best place to put such code ?


Is the class DagBag's __init__() a good candidate ?


Please advise.


Thanks.


#############################################

class DagBag(LoggingMixin):
    """
    A dagbag is a collection of dags, parsed out of a folder tree and has high
    level configuration settings, like what database to use as a backend and
    what executor to use to fire off tasks. This makes it easier to run
    distinct environments for say production and development, tests, or for
    different teams or security profiles. What would have been system level
    settings are now dagbag level so that one system can run multiple,
    independent settings sets.

    :param dag_folder: the folder to scan to find DAGs
    :type dag_folder: str
    :param executor: the executor to use when executing task instances
        in this DagBag
    :param include_examples: whether to include the examples that ship
        with airflow or not
    :type include_examples: bool
    :param sync_to_db: whether to sync the properties of the DAGs to
        the metadata DB while finding them, typically should be done
        by the scheduler job only
    :type sync_to_db: bool
    """
    def __init__(
            self,
            dag_folder=None,
            executor=DEFAULT_EXECUTOR,
            include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES'),
            sync_to_db=False):

        dag_folder = dag_folder or DAGS_FOLDER
        self.logger.info("Filling up the DagBag from {}".format(dag_folder))
        self.dag_folder = dag_folder
        self.dags = {}
        self.sync_to_db = sync_to_db
        self.file_last_changed = {}
        self.executor = executor
        self.import_errors = {}
        if include_examples:
            example_dag_folder = os.path.join(

...

#############################


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message