airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "F. Hakan Koklu" <fhakan.ko...@gmail.com>
Subject Re: Determine if a DAG is running
Date Thu, 16 Feb 2017 18:29:08 GMT
Nik,

I had a similar problem and I solved it by querying the dagrun table to
make sure a dag is not running and kick it off with the TriggerDagRun. It
has been running for months now without any issues so I highly recommend it.

I added a utility function and I call that before I do the trigger. Here it
is:


def is_dag_running(dag_id):
    """Given the dag_id it returns if it is currently running or not"""
    with session_scope() as session:
        result = session.query(af_models.DagRun)\
            .filter(af_models.DagRun.dag_id == dag_id,
                    af_models.DagRun.state == 'running')\
            .count()
    return bool(result)

session_scope is a context manager which simply yields
airflow.settings.session and cleans up. Hope this helps.


On Thu, Feb 16, 2017 at 6:14 AM, Boris Tyukin <boris@boristyukin.com> wrote:

> Hi Nik, looks like you experience the same problem I had
> https://lists.apache.org/thread.html/%3CCANRT7T1te7ei82nvry-yEuBU2-
> g4AX3GSdTL3RnXyHJ2Q-qqnA@mail.gmail.com%3E
>
> Bolke suggested to use a pool to ensure only one instance is running.
>
> Max active run affects only scheduled jobs so triggered ones or kicked off
> manually won't respect this setting for some reason (looks like it is by
> design by looking at the source code)
>
> Boris
>
> On Wed, Feb 15, 2017 at 6:51 PM, Nicholas Hodgkinson <
> nik.hodgkinson@collectivehealth.com> wrote:
>
> > I'm using the TriggerDagRunOperator within a Sensor DAG to automatically
> > sense and start loads, however the problem I'm coming across, given that
> I
> > want to run this Sensor DAG often (every 5 minutes), is the same run can
> be
> > triggered multiple times. Is there a way to avoid this? I've set
> > max_active_runs to 1, but that doesn't seem to be respected when doing
> > manual triggers. I suppose I could write out a file when I trigger the
> job,
> > have the job clean it up, and then not trigger if that file exists, but I
> > feel like there should be a better way to accomplish this; not to
> mention a
> > failure in the dag would break this process.
> >
> > Thoughts? I've been trying to figure this out for a while and Gitter and
> > the web haven't been much help.
> >
> > -N
> > nik.hodgkinson@collectivehealth.com
> > (913) 927-4891
> >
> > --
> >
> >
> > Read our founder's story.
> > <https://collectivehealth.com/blog/started-collective-health/>
> >
> > *This message may contain confidential, proprietary, or protected
> > information.  If you are not the intended recipient, you may not review,
> > copy, or distribute this message. If you received this message in error,
> > please notify the sender by reply email and delete this message.*
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message