airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Toonstra <>
Subject Re: Flow-based Airflow?
Date Tue, 24 Jan 2017 07:16:38 GMT
data lineage is one of the things you mentioned in an early presentation
and I was wondering about it.

I wouldn't mind setting up an initial contribution towards achieving that,
but would like to understand
the subject a bit better. The easiest MVP is to use the annotations method
to simply show how
data flows, but you mention other things that need to be done in the third
paragraph. If a wiki could
be written on the subject, explaining why those things are done, we can set
up a discussion and
create an epic with jira issues to realize that.

The way I think this can be visualized is perhaps through a sankey diagram,
which helps to make
complex systems more understandable, eg:
- how is transaction margin calculated?  What is all the source data?
- where does customer data go to and are those systems compliant?
- what is the overall data dependency between systems and can these be
- which data gets used everywhere?
- which end systems consume from the most diverse sources of data?

and other questions appropriate for data lineage.



On Tue, Jan 24, 2017 at 2:04 AM, Maxime Beauchemin <> wrote:

> A few other thoughts related to this. Early on in the project, I had
> designed but never launched a feature called "data lineage annotations"
> allowing people to define a list of sources, and a list of targets related
> to a each task for documentation purposes. My idea was to use a simple
> annotation string that would uniquely map to a data object. Perhaps a URI
> as  in `{connection_type}://{conn_id}/{something_unique}` or something to
> that effect.
> Note that operators could also "infer" lineage based on their input
> (HiveOperator could introspect the HQL statement to figure out input and
> outputs for instance), and users could override the inferred lineage if so
> desired, either to abstract complexity like temp tables and such, to
> correct bad inference (SQL parsing is messy), or in cases where operators
> wouldn't implement the introspection functions.
> Throw a `data_object_exist(data_object_uri)` and a
> `clear_data_object(data_object_uri)` method in existing hooks, and a
> `BaseOperator.use_target_presence_as_state=False` boolean and some
> handling
> of in the dependency engine and while "clearing" and we're not too far from
> a solution.
> As a more generic alternative, potentially task states could be handled by
> a callback when so-desired. For this, all we'd need to do is to add a
> `status_callback(dag, task, task_instance)` callback to BaseOperator, and
> evaluate it for state in place of the database state where user specify.
> Max
> On Mon, Jan 23, 2017 at 12:23 PM, Maxime Beauchemin <
>> wrote:
> > Just commented on the blog post:
> >
> > ----------------------------
> > I agree that workflow engines should expose a way to document data
> objects
> > it reads from and writes to, so that it can be aware of the full graph of
> > tasks and data objects and how it all relates. This metadata allows for
> > clarity around data lineage and potentially deeper integration with
> > external systems.
> > Now there's the question of whether the state of a workflow should be
> > inferred based on the presence or absences of related targets. For this
> > specific question I'd argue that the workflow engine needs to manage its
> > own state internally. Here are a few reasons why: * many maintenance
> > tasks don't have have a physical output, forcing the creation of dummy
> > objects representing state * external systems have no guarantees as to
> > how quickly you can check for the existence of an object, therefore
> > computing what task can run may put a burden on external systems, poking
> at
> > thousands of data targets (related: the snakebite lib was developed in
> part
> > to help with the Luigi burden on HDFS) * how do you handle the "currently
> > running" state? a dummy/temporary output? manage this specific state
> > internally? * how to handle a state like the "skipped" in Airflow
> > (related to branching)? creating a dummy target? * if you need to re-run
> > parts of the pipeline (say a specific task and everything downstream for
> a
> > specific date range), you'll need to go and alter/delete the presence of
> a
> > potentially intricate list of targets. This means the workflow engine
> needs
> > to be able to delete files in external systems as a way to re-run tasks.
> > Note that you may not always want to take these targets offline for the
> > duration of the backfill. * if some tasks are using staging or temporary
> > tables, cleaning those up to regain space would re-trigger the task, so
> > you'll have to trick the system into achieving what you want to do
> > (overwriting with an empty target?), perhaps changing your unit of work
> by
> > creating larger tasks that include the temporary table step, but that may
> > not be the unit-of-work that you want From my perspective, to run a
> > workflow engine at scale you need to manage its state internally because
> > you need strong guarantees as to reading and altering that state. I agree
> > that ideally the workflow engine should know about input and output data
> > objects (this is not the case currently in Airflow), and it would be a
> real
> > nice thing to be able to diff & sync state across its internal state and
> > external one (presence of targets), but may be challenging.
> >
> > Max
> >
> > On Mon, Jan 23, 2017 at 8:05 AM, Bolke de Bruin <>
> wrote:
> >
> >> Hi All,
> >>
> >> I came by a write up of some of the downsides in current workflow
> >> management systems like Airflow and Luigi (
> >> lows-dataflow-not-task-deps) where they argue dependencies should be
> >> between inputs and outputs of tasks rather than between tasks
> >> (inlets/outlets).
> >>
> >> They extended Luigi ( to do this
> >> and even published a scientific paper on it:
> >> .
> >>
> >> I kind of like the idea, has anyone played with it, any thoughts? I
> might
> >> want to try it in Airflow.
> >>
> >> Bolke
> >
> >
> >

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message