airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Annessa <steve.anne...@gmail.com>
Subject Re: DAG design for interacting with APIs
Date Thu, 02 Feb 2017 21:33:58 GMT
Thanks for the feedback Laura and Bolke. I think I'll try Laura's approach
and make the call to launch the task in the Sensor's init function.

-- Steve

On Thu, Feb 2, 2017 at 9:55 AM, Laura Lorenz <llorenz@industrydive.com>
wrote:

> We've gotten around this by implementing the external async job API call in
> the __init__ of the sensor and then poll as normal. If the polling fails,
> the next sensor instantiates a new external async job. Note this will also
> trigger new jobs if you hit the timeout.
>
> Here's the gist with our dag and sensor:
>
> https://gist.github.com/lauralorenz/bf47280b90067c71fe691bdf70b4145a
>
> On Thu, Feb 2, 2017 at 8:36 AM, Bolke de Bruin <bdbruin@gmail.com> wrote:
>
> > Hi Steve,
> >
> > At the moment we don’t have the possibility in Airflow to combine
> multiple
> > tasks in one unit of analysis, eg. when a task fails return to the
> > beginning of the set. We also don’t expose the functionality of
> resetting a
> > task state by API at the moment. You could mimic this behaviour (warning
> > this really is a hack) that if you get to a failed state you clear the
> > state of the earlier task in the database. I never tried it and it
> > certainly isn’t very clean or will be supported in anyway.
> >
> > What you could do is spit your dag in two. One that runs your
> > SinglarityOperator and one that monitors it. If it fails the monitor can
> > trigger a new dag_run for your first dag.
> >
> > - Bolke
> >
> > > On 1 Feb 2017, at 08:40, Steve Annessa <steve.annessa@gmail.com>
> wrote:
> > >
> > > I need help designing a DAG
> > >
> > > High level problem:
> > > I need a way to launch tasks through an API and manage their state,
> when
> > > they fail I need the ability to automatically retry.
> > >
> > > What's involved:
> > > We use Singularity (https://github.com/HubSpot/Singularity) to launch
> > tasks
> > > on Mesos which can be standalone containers or Spark jobs that run for
> > > hours.
> > >
> > > What I've done so far:
> > > I've written an Operator for interacting with the Singularity API and I
> > can
> > > launch a task that Singularity manages. I then need to wait and poll
> the
> > > API for changes to the task state. The task can be in a few states but
> > the
> > > most important are FINISHED and FAILED. So I wrote a Sensor that polls
> > the
> > > API and watches for the task UID, that was passed through XCom from the
> > > SingularityOperator, each poll it checks the various states. If
> > everything
> > > passes, everything is great and the DAG moves along.
> > > The problem happens when the Singularity task fails, the
> > SingularitySensor
> > > will fail which is fine, but I don't know of a way to tell the previous
> > > SingularityOperator task to re-execute, so the DAG is stuck.
> > >
> > > Options I'm considering to resolve this problem:
> > > 1. Remove the Sensor and put the polling logic in the execute function
> > for
> > > the SingularityOperator. That will mean the Operator task will last for
> > the
> > > duration of the Singularity managed task which can be 4+ hours and the
> > > majority of the time will be spent polling the API. I'll also have to
> > write
> > > my own poll logic, which isn't terrible but I won't get to use the work
> > > already written in the BaseSensorOperator
> > > 2. Find a way to call back to the previous task in the event of Sensor
> > > failure; I'd like the flow to go "execute_singularity_task ->
> > > check_singularity_task"; if "check_singularity_task" is in the FAILED
> > > state, clear both "execute_singularity_task" and
> "check_singularity_task"
> > > and rerun from "execute_singularity_task" on.
> > > 3. Ask you guys for a better design
> > >
> > > The end goal is to have the following:
> > > 0. The ability to launch and manage tasks through the Singularity API
> > > 1. The ability for retries on failure at any point in the DAG without
> > human
> > > intervention
> > > 2. A simple as possible DAG
> > >
> > > Here's a gist for the DAG:
> > > https://gist.github.com/sannessa/dea05f743a1250c1e5e8a8e10c49d7b5
> > >
> > > Here's a gist for the Operator:
> > > https://gist.github.com/sannessa/7652c97de3c99426663d9541b2abeba3
> > >
> > > Here's a gist for the Sensor:
> > > https://gist.github.com/sannessa/14a427ee55f90ec2dff60e038e93edb5
> > > (This is a crude implementation and doesn't handle all of the states. I
> > > figured before I invested more time in making this more robust and
> > elegant
> > > I'd spend time figuring out if this was the correct tool for the job.)
> > >
> > > Thanks!
> > >
> > > -- Steve
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message