airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bolke de Bruin <>
Subject Re: Lineage
Date Sun, 06 May 2018 06:20:28 GMT
Hi Marcin,

That would be awesome! The reason I chose to use DataSets is because it aligns easily with
Apache Atlas’ understanding of what a dataset is. I had no other example apart from IBM’s
Infosphere which I really do not like to get into. So I am definitely open for changes.

Another thing is obviously that many of the Operators are not SQL based or even SQL like,
so I wonder how your worked with that. Being able to automatically derive metadata is of course
the holy grail.

Finally what I also tried to accomplish is to have an easy way to share metadata between tasks.
This requires being able to serialize the metadata to json as it is being shared by XCom.

So yes please share! We can generalize the best practices then!


Verstuurd vanaf mijn iPad

> Op 6 mei 2018 om 00:37 heeft Marcin Szymański <> het volgende
> Hi Bolke
> Great stuff. Pieces of this this remind work I have done for one
> organization. However in that case, instead of defining base classes like
> Dataset form scratch, I extended objects from SQLAlchemy, such as Metadata,
> Table, etc. This in turn allowed for automated SQL generation (with some
> changes to operators), defining data quality checks and many other cool
> things. Maybe it's worth going down that path? I am willing to share more
> details, if interested.
> Best
> Marcin
>> On Sat, May 5, 2018, 22:49 Bolke de Bruin <> wrote:
>> Hi All,
>> I have made a first implementation that allows tracking of lineage in
>> Airflow and integration with Apache Atlas. It was inspired by Jeremiah’s
>> work in the past on Data Flow pipelines, but I think I kept it a little bit
>> simpler.
>> Operators now have two new parameters called “inlets” and “outlets”. These
>> can be filled with objects derived from “DataSet”, like “File” and
>> “HadoopFile”. Parameters are jinja2 templated, which
>> means they receive the context of the task when it is running and get
>> rendered. So you can get definitions like this:
>> f_final = File(name="/tmp/final")
>> run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
>>    inlets={"auto": True},
>>    outlets={"datasets": [f_final,]})
>> f_in = File(name="/tmp/whole_directory/")
>> outlets = []
>> for file in FILE_CATEGORIES:
>>    f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
>>    outlets.append(f_out)
>> run_this = BashOperator(
>>    task_id='run_after_loop', bash_command='echo 1', dag=dag,
>>    inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
>>    outlets={"datasets": outlets}
>>    )
>> run_this.set_downstream(run_this_last)
>> So I am trying to keep to boilerplate work down for developers. Operators
>> can also extend inlets and outlets automatically. This will probably be a
>> bit harder for the BashOperator without some special magic, but an update
>> to the DruidOperator can be relatively quite straightforward.
>> In the future Operators can take advantage of the inlet/outlet definitions
>> as they are also made available as part of the context for templating (as
>> “inlets” and “outlets”).
>> I’m looking forward to your comments!
>> Bolke.

View raw message