airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Elliot <p...@mymusictaste.com>
Subject Re: TriggerDagRunOperator : configurable execution date?
Date Tue, 28 Nov 2017 09:13:25 GMT
Btw I ended up patching this for now, I'm sure it could be better but maybe
it's helpful:

class MMTTriggerDagRunOperator(TriggerDagRunOperator):
    """
    MMT-patched for passing explicit execution date (otherwise it's hard to
hook the datetime.now() date)

    :param execution_date: the custom execution date (jinja'd)
    :type execution_date: str
    remaining args for TriggerDagRunOperator

    ...

    """

    template_fields = ('execution_date',)

    def __init__(self, trigger_dag_id, python_callable,
execution_date=None, *args, **kwargs):
        self.execution_date = execution_date
        super(MMTTriggerDagRunOperator,
self).__init__(trigger_dag_id=trigger_dag_id,

 python_callable=python_callable,
                                                       *args, **kwargs)

    def execute(self, context):
        run_id_dt = dt.strptime(self.execution_date, '%Y-%m-%d') if
self.execution_date else dt.now()
        dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
        dro = self.python_callable(context, dro)
        if dro:
            session = settings.Session()
            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                state=State.RUNNING,
                execution_date=self.execution_date,
                conf=dro.payload,
                external_trigger=True)
            logging.info("Creating DagRun {}".format(dr))
            session.add(dr)
            session.commit()
            session.close()
        else:
            logging.info("Criteria not met, moving on")

On Mon, Nov 27, 2017 at 7:48 PM, Paul Elliot <paul@mymusictaste.com> wrote:

> Hi all,
>
> I'm using TriggerDagRunOperator to start off some DAGs. Works fine but,
> doesn't seem to be a way to set the execution date in the same way as
> `airflow trigger_dag -e <EXECUTION DATE> ..`.
>
> This seems to be a valid use case..for me it's a pain when I need to
> combine with ExternalTaskSensor (which cannot easily guess the execution
> date timestamp).
>
> Maybe I'm missing something, but I did dig through the mailing list and
> checked out dagrun_operator.py.
>
> Any help much appreciated :)
>
> P
>
> --
>
> *Paul Elliott | 엘리엇폴*
> Developer / Platform Dev. team
> paul@mymusictaste.com
> +82-10-2990-8642
>



-- 

*Paul Elliott | 엘리엇폴*
Developer / Platform Dev. team
paul@mymusictaste.com
+82-10-2990-8642

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message