airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Laura Lorenz <llor...@industrydive.com>
Subject Re: DAG design for interacting with APIs
Date Thu, 02 Feb 2017 17:55:46 GMT
We've gotten around this by implementing the external async job API call in
the __init__ of the sensor and then poll as normal. If the polling fails,
the next sensor instantiates a new external async job. Note this will also
trigger new jobs if you hit the timeout.

Here's the gist with our dag and sensor:

https://gist.github.com/lauralorenz/bf47280b90067c71fe691bdf70b4145a

On Thu, Feb 2, 2017 at 8:36 AM, Bolke de Bruin <bdbruin@gmail.com> wrote:

> Hi Steve,
>
> At the moment we don’t have the possibility in Airflow to combine multiple
> tasks in one unit of analysis, eg. when a task fails return to the
> beginning of the set. We also don’t expose the functionality of resetting a
> task state by API at the moment. You could mimic this behaviour (warning
> this really is a hack) that if you get to a failed state you clear the
> state of the earlier task in the database. I never tried it and it
> certainly isn’t very clean or will be supported in anyway.
>
> What you could do is spit your dag in two. One that runs your
> SinglarityOperator and one that monitors it. If it fails the monitor can
> trigger a new dag_run for your first dag.
>
> - Bolke
>
> > On 1 Feb 2017, at 08:40, Steve Annessa <steve.annessa@gmail.com> wrote:
> >
> > I need help designing a DAG
> >
> > High level problem:
> > I need a way to launch tasks through an API and manage their state, when
> > they fail I need the ability to automatically retry.
> >
> > What's involved:
> > We use Singularity (https://github.com/HubSpot/Singularity) to launch
> tasks
> > on Mesos which can be standalone containers or Spark jobs that run for
> > hours.
> >
> > What I've done so far:
> > I've written an Operator for interacting with the Singularity API and I
> can
> > launch a task that Singularity manages. I then need to wait and poll the
> > API for changes to the task state. The task can be in a few states but
> the
> > most important are FINISHED and FAILED. So I wrote a Sensor that polls
> the
> > API and watches for the task UID, that was passed through XCom from the
> > SingularityOperator, each poll it checks the various states. If
> everything
> > passes, everything is great and the DAG moves along.
> > The problem happens when the Singularity task fails, the
> SingularitySensor
> > will fail which is fine, but I don't know of a way to tell the previous
> > SingularityOperator task to re-execute, so the DAG is stuck.
> >
> > Options I'm considering to resolve this problem:
> > 1. Remove the Sensor and put the polling logic in the execute function
> for
> > the SingularityOperator. That will mean the Operator task will last for
> the
> > duration of the Singularity managed task which can be 4+ hours and the
> > majority of the time will be spent polling the API. I'll also have to
> write
> > my own poll logic, which isn't terrible but I won't get to use the work
> > already written in the BaseSensorOperator
> > 2. Find a way to call back to the previous task in the event of Sensor
> > failure; I'd like the flow to go "execute_singularity_task ->
> > check_singularity_task"; if "check_singularity_task" is in the FAILED
> > state, clear both "execute_singularity_task" and "check_singularity_task"
> > and rerun from "execute_singularity_task" on.
> > 3. Ask you guys for a better design
> >
> > The end goal is to have the following:
> > 0. The ability to launch and manage tasks through the Singularity API
> > 1. The ability for retries on failure at any point in the DAG without
> human
> > intervention
> > 2. A simple as possible DAG
> >
> > Here's a gist for the DAG:
> > https://gist.github.com/sannessa/dea05f743a1250c1e5e8a8e10c49d7b5
> >
> > Here's a gist for the Operator:
> > https://gist.github.com/sannessa/7652c97de3c99426663d9541b2abeba3
> >
> > Here's a gist for the Sensor:
> > https://gist.github.com/sannessa/14a427ee55f90ec2dff60e038e93edb5
> > (This is a crude implementation and doesn't handle all of the states. I
> > figured before I invested more time in making this more robust and
> elegant
> > I'd spend time figuring out if this was the correct tool for the job.)
> >
> > Thanks!
> >
> > -- Steve
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message