airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maxime Beauchemin <maximebeauche...@gmail.com>
Subject Re: Dynamic creation of DAG
Date Tue, 20 Dec 2016 20:24:39 GMT
>From looking at the code it doesn't seem like you're going in the right
direction. All you need to do is create DAG objects and stamp them in the
global namespace. This is configuration as code, you shouldn't be using
`session` or calling any of the internal things like `bag_dag`. The
scheduler (and web server) will walk through your DAGS_FOLDER and find the
DAG objects in your modules, and schedule them based on their configuration.

Max

On Tue, Dec 20, 2016 at 6:48 AM, Georg Walther <georg.walther@markovian.com>
wrote:

> Hi Maxime,
>
>
> thanks for the link you posted.
>
> The example provided in the documentation on dynamically adding DAGs does
> not work for us - even bearing in mind your addendum you pointed to.
>
> Our setup is as follows:
>
> We deploy the webserver, scheduler and four workers (using the celery
> executor) in one separate docker container each (all containers are
> connected via the same Docker network).
>
> The dag bag directory pointed to by `dags_folder` in airflow.cfg contains
> one Python module that implements a DAG factory:
>
> The DAG factory fetches DAG parameters from a database and instantiates the
> appropriate DAG object (complete with all tasks).
> Doing `globals()[dag_id] = dag_object` in the DAG factory code has no
> effect i.e. the `dag_object` does not appear in our webserver and is not
> executed.
>
> The DAG factory code is executed in one of the four aforementioned workers.
>
> I have attempted some DAG factory code that adds the DAG object to the
> airflow database, however all I end up with are the corresponding `dag_id`s
> listed in the webserver as "unavailable":
>
> (... DAG factory class code ...)
> @provide_session
>     def _add_dag_to_airflow(self, dag, session=None):  # `dag` is the DAG
> object instantiated by the DAG factory
>         for attribute, default in [('is_subdag', False)]:
>             dag = self._add_dag_attribute(dag, attribute, default)
>
>         DAG.sync_to_db(dag, dag.owner, datetime.utcnow())
>         dag_pickle = dag.pickle()
>
>         dag_model = session.\
>             query(DagModel).\
>             filter(DagModel.dag_id == dag.dag_id).\
>             first()
>         dag_model.pickle_id = dag_pickle.id
>         dag_model.is_paused = False
>
>         session.merge(dag_model)
>         session.commit()
>
>         bag = DagBag('', include_examples=False)
>         bag.bag_dag(dag, None, None)
>
>         queue = []
>         job = SchedulerJob(dag_id=dag.dag_id)
>         job._process_dags(bag, [dag], queue)
>
>         for dag_id, task_id, execution_date in queue:
>             task = dag.get_task(task_id)
>
>             task_instance = TaskInstance(task, execution_date)
>             task_instance.state = State.SCHEDULED
>
>             session.merge(task_instance)
>             session.commit()
>
>
>
> Any help on this workflow would be greatly appreciated!
>
>
> Best,
>
> Georg
>
>
> On Tue, Nov 22, 2016 at 10:46 PM, Maxime Beauchemin <
> maximebeauchemin@gmail.com> wrote:
>
> > I added a paragraph to the FAQ entry: "How can I create DAGs
> dynamically?"
> >
> > The GH link above points straight to that paragraph. It's related to an
> > issue that was discussed on this mailing list last week.
> >
> > I agree that working examples are ideal, and they can help with test
> > coverage as well.
> >
> > Max
> >
> > On Tue, Nov 22, 2016 at 1:10 PM, siddharth anand <sanand@apache.org>
> > wrote:
> >
> > > Hi Max,
> > > Which part in the above PR is related to dynamic dags?
> > >
> > > When thinking about adding documentation about functionality, I propose
> > the
> > > community bias towards adding working examples and test coverage. We
> > offer
> > > a quick start (which by the way needs some updates - for example, why
> > does
> > > it not start airflow-scheduler after starting the webserver?), but then
> > > folks get stuck in how to write DAGs and use the full range of Airflow
> > > capabilities. This is where examples and better test coverage help keep
> > > newbies productive.
> > >
> > > Perhaps the examples and tests can be upgraded to show a fuller set of
> > > dynamic dag capabilities?
> > >
> > > -s
> > >
> > > On Mon, Nov 21, 2016 at 7:55 PM, Maxime Beauchemin <
> > > maximebeauchemin@gmail.com> wrote:
> > >
> > > > I just added a bit of information about dynamic DAG creation here:
> > > > https://github.com/apache/incubator-airflow/pull/1889/files#diff-
> > > > c6f0a0722c6a2f86277535d7bcec7f8cR162
> > > >
> > > > Let me know if it helps.
> > > >
> > > > Max
> > > >
> > > > On Mon, Nov 21, 2016 at 2:58 AM, Deepak Kumar Malladi <
> > > > kapeed2091@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I want to dynamically create DAG during run time. I tried the
> snippet
> > > > given
> > > > > in the documentation. But it didnt work for me.
> > > > >
> > > > > Any pointer on how to trigger DAGs which aren't actually present
in
> > DAG
> > > > > folder but are created through code execution (dynamically
> created)?
> > > > >
> > > > >
> > > > > Thanks & Regards,
> > > > > Deepak
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message