airflow-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kaxil Naik <>
Subject Re: Intra-task "xcom"
Date Thu, 23 Jul 2020 11:31:38 GMT
Definitely, let's connect.

On Wed, Jul 22, 2020 at 11:26 PM Daniel Standish <>

> We are using state persistence pretty heavily right now with plugin models
> that I have called ProcessState and TaskState
> Our implementation might be too idiosyncratic to contribute to airflow,
> but then again it might not.  I would be happy to do a call to demo what we
> are doing to see if there is any interest, and to receive guidance from
> interested parties re what if anything might make sense in airflow.  Kaxil
> do you have any interest in that?
> On Thu, Jun 4, 2020 at 4:49 PM Kaxil Naik <> wrote:
>> I definitely feel we can support this uses-cases by improving XCom. The
>> concept of XCom was to allow sharing messages & state between tasks.
>> Here is the first line from the docs about Xcom:
>> XComs let tasks exchange messages, allowing more nuanced forms of control
>> and shared state. The name is an abbreviation of “cross-communication”.
>> I read the AIP (
>> ) from @Daniel Standish <> , the "namespacing" of
>> this state would be a good feature,
>> XCom already allows that with "dag_id, task_id" key. *Solution 1* in the
>> AIP would solve the issue without much impact and whilst
>> maintaining backwards-compatiblilty.
>> I am against the idea of using Secrets Backend for storing "State".
>> Storing state for some kind of persistence should be "short-lived" and
>> temporary.
>> The "writers" & "readers" of both (Secrets & State) are different.
>> Generally, Sysadmins / Teamleads are responsible for managing secrets
>> (writing, rotating, auditing) etc whereas for State it is written via
>> Airflow Workers  and would (or should) be short-lived and you don't care by
>> auditing or rotating the value in "State".
>> The only problem that I can see in the current XCom implementation is 1)
>> the use of execution_date and the fact that 2) XCom are cleared at the
>> start.
>> One of the issue we already want to address in Airflow is to remove the
>> hard-requirement of "execution_date" for DagRun and TaskInstance. This
>> would also help in fixing (1) above.
>> (2) can be solved by a flag as mentioned in the AIP.
>> Regards,
>> Kaxil
>> On Tue, Jun 2, 2020 at 8:05 AM Jarek Potiuk <>
>> wrote:
>>> I think this subject came so often, that I also change my mind slowly in
>>> favor of making an explicit state persistence "service".
>>> Whether it's only one key or more, it's secondary, I think but if users
>>> are already using Variables to keep state for tasks - this is a clear sign
>>> that we miss a crucial feature and our users are abusing Airflow already in
>>> the way we try to prevent by not introducing "State service".
>>> With the recent SecretBackend implementation where Variables might be
>>> kept in a Secret backend - not only MetaStore - potentially you might have
>>> no write access to the backend. There is even no "write" support in the
>>> current "MetastoreBackend" implementation for writing variables. So we
>>> already have configurations where if we try to write variables and read it
>>> elsewhere might not work - as far as I can see. You can set several
>>> backends of course and the Metastore as the last fallback of course, but
>>> for me, it opens up different problems - what happens if the key is present
>>> in both, tasks writes it to metastore, but another task reads it from the
>>> Secret Backend.
>>> I think it seems that variables are being abused in exactly the way we
>>> want to prevent the "StateService" to be abused - and shying away from that
>>> is really like closing our eyes and pretending it's not happening.
>>> So maybe we can make a change AIP with this approach:
>>> 1) Variables -> mostly read-only (for tasks)  and used to keep
>>> configuration shared between workers (but not on a task level).
>>> 2) StateService (or wherever we call it) where we keep state information
>>> for specific dag + task + execution_date.
>>> J.
>>> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <>
>>> wrote:
>>>> Airflow already provides a mechanism for state persistence: the
>>>> Variable, and, with caveats and flaws, XCom.
>>>> I personally persist state to the airflow metastore database for a
>>>> large percentage of our jobs.  They are incremental jobs and it is helpful
>>>> to keep track of watermark.
>>>> I think that incremental jobs are probably very very common in airflow
>>>> implementations.  Though probably often times users resort to imperfect
>>>> vehicles for this such as `execution_date` or xcom.
>>>> I have a very draftey draft aip that i haven't had enough time to work
>>>> on, which explores adding explicit support for state persistence to
>>>> airflow:
>>>> Though I understand it is a controversial idea.  (note: The AIP is not
>>>> ready for primetime.)
>>>> I am of the mind that being able to persist some state is not a
>>>> fundamental change to airflow and would just add explicit (and more
>>>> user-friendly) support for something that is already quite common, and fits
>>>> fully within the wheelhouse of what airflow exists to do.
>>>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <> wrote:
>>>>> Furcy,
>>>>> To clarify, when I say that Airflow should not be in the business of
>>>>> keeping state about external systems, I specifically mean it shouldn't
>>>>> keeping state to be shared between task instances. I completely understand
>>>>> that there may be external systems that are harder to work with, and
>>>>> in your case require the operator to be able to store some piece of
>>>>> information to make them idempotent. I just don't think that Airflow
>>>>> provide that storage mechanism.
>>>>> I would think that most users of Airflow have access to some sort of
>>>>> cloud storage like S3 (which are really just key-value stores), and it's
>>>>> easy enough to write your job_id or whatever value you care about to
a file
>>>>> with a prefix computed from the dag_id, task_id, execution_date or whatever
>>>>> combination of them you care about. Yes it makes your operators more
>>>>> complex and they have to know about another system, but it keeps that
>>>>> complexity out of core Airflow. That's the trade off.
>>>>> Ash,
>>>>> I'm not suggesting that XCom be removed from Airflow, and I understand
>>>>> there are use cases where it makes some things convenient. In your example
>>>>> though, it would be just as easy for the sensor to write the found object
>>>>> path as the contents of another file in S3, with a computable prefix
>>>>> on the dag/task/execution_date.
>>>>> At its heart XCom is just a key-value store where the keys are limited
>>>>> to a very specific set of possibilities, and where key-value pairs are
>>>>> managed in some specific ways. The request here is to add another narrowly
>>>>> defined set of allowable keys, and as far as I can tell with no extra
>>>>> management of them. The only real advantage of using the Airflow database
>>>>> for XCom or any expansion/variation on it is that we know that all
>>>>> operators have access to the database.
>>>>> I'm not an expert but I would wonder how well Postgres or MySQL
>>>>> perform as high volume key value stores. Does anyone actually use XCom
>>>>> scale, and does that extra load on the database impact scheduling and
>>>>> performance aspects of Airflow?
>>>>> Chris
>>>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <>
>>>>> wrote:
>>>>>> Just to touch on one point about XCom, and to re-assure people that
>>>>>> they, or something very like them are in Airflow for the foreseeable
>>>>>> As an example of an appropriate use for XCom: Let's say a third
>>>>>> party delivers you a set of files once a week, but the exact name
of the
>>>>>> files isn't known (by you) in advance. So you write a sensor that
>>>>>> polls/checks S3 for the Objects to appear in our bucket, and the
>>>>>> outputs the S3 Object path to XCom, that then next processing step
>>>>>> examines to process the files.
>>>>>> That sort of use case is not going anywhere.
>>>>>> Cheers,
>>>>>> -ash
>>>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <>
>>>>>> At the risk of repeating myself (from the previous thread that
>>>>>> touched on this topic), I don't think Airflow should be in the business
>>>>>> keeping state about external systems. Airflow is about authoring
>>>>>> running workflows; it's not a messaging tool or a cluster management
>>>>>> I'm not convinced that the existing XCom functionality should really
be a
>>>>>> part of Airflow, and I certainly don't think it should be expanded
upon or
>>>>>> new variations added. I think storing state is especially risky,
if for no
>>>>>> other reason than the fact that Airflow is not the source of truth
>>>>>> those systems. It's very likely that at some times the "state" that
>>>>>> has saved will diverge from the actual state of the external system.
>>>>>> Handling that nicely, probably requires a bunch of custom code in
>>>>>> operators/hooks anyway, so I don't think it saves anything in terms
>>>>>> operator code complexity. Users would be much better served going
to the
>>>>>> source of truth to determine state. If part of the problem is that
Livy is
>>>>>> lacking in features (like being able to query the status of a particular
>>>>>> job_id) then I think it would be more appropriate to add the needed
>>>>>> features to that project. Airflow at its core shouldn't be concerned
>>>>>> making up for failures of other tools.
>>>>>> Also as can be seen by just this discussion, it's hard to keep these
>>>>>> extra features from expanding in scope. Jarek proposed something
that would
>>>>>> just store a single string, and immediately Furcy wants to expand
it to
>>>>>> store multiple strings. Either way we are really just talking about
>>>>>> key-value store, and putting limits on how that key can be structured;
>>>>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>>>>> proposal) or some arbitrary key along with those Airflow entities
>>>>>> proposal).
>>>>>> I know in the past that I had a situation where I wanted to reuse
>>>>>> cluster across multiple data intervals, if one was already running
>>>>>> was before I discovered Airflow so wasn't "execution dates" precisely).
>>>>>> can equally see use cases where I might want to share some resource
>>>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs.
So if we
>>>>>> added this then why limit it to any one of those combinations? But
then we
>>>>>> just have an arbitrary key-value store. If you want to use Airflow
for that
>>>>>> then you can use Variables, if you want to use something else then
you can.
>>>>>> Unless Airflow is doing some extra management of these key-values
>>>>>> some way (like it does with clearing out XCom's on reruns), then
I see
>>>>>> absolutely no added benefit. And even with some potential management
>>>>>> Airflow I'm still not convinced that Airflow is the right place for
>>>>>> Chris
>>>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <>
>>>>>> Thank you Jarek for the detailed explanation,
>>>>>> That's exactly what I wanted to do: write a feature request to
>>>>>> summarize all those discussions.
>>>>>> I agree with you that the feature should be marked distinct from
>>>>>> XCom feature and that we should not piggyback this feature into XCom.
>>>>>> The crux of the problem, I think is that with XCom you do want the
>>>>>> task to delete it's xcom on the beginning of the retry.
>>>>>> Correct me if I'm wrong but one use cases where it was necessary
>>>>>> having a task A and a task B that starts immediately after A, and
wait from
>>>>>> some 'signal' from A.
>>>>>> If A and B restart and A doesn't reset it's signal, then B will use
>>>>>> the signal from A's first try, which is incorrect.
>>>>>> About the 3 solutions you mention:
>>>>>> 1) Providing the job_id from outside. That works indeed. Sadly in
>>>>>> use-case Livy's API is poorly designed and only returns a generated
>>>>>> you can't specify a custom one.
>>>>>> You can't even find a job by name, I would have to list all the
>>>>>> active job_ids, and do a GET for each of them to get it's name and
>>>>>> which one is the one I want. It's doable but inelegant.
>>>>>> 2) Store the id in an external storage. Of course it would work but
>>>>>> it requires an external storage. More on that below.
>>>>>> 3) I'm not sure I understand completely what you mean there, but
>>>>>> think you mean that the idempotency can be handled by the service
you call
>>>>>> (for instance BigQuery). Indeed that is another solution. If we were
>>>>>> Spark with a Hive metastore + locking or the deltalake storage format,
>>>>>> could have something to prevent a job that run twice from creating
>>>>>> duplicates. This is another solution we are considering, but it is
>>>>>> coslty to change now.
>>>>>> You guess correctly that the feature I was asking for me would be
>>>>>> provide some utility to let the users implement solution 2) without
>>>>>> requiring an external storage.
>>>>>> I think it would be a QOL improvement for some use cases, just like
>>>>>> it could be argued that XCom is just a QOL improvement and users
could have
>>>>>> used an external storage themselves.
>>>>>> The main advantage that it brings is making the custom operators
>>>>>> easier to share and reuse across the Apache Airflow community, compared
>>>>>> having to set up some external
>>>>>> storage.
>>>>>> I have seen that some users used the metadata store itself as an
>>>>>> external storage by adding a new table to the airflow model:
>>>>>> And others suggested using XCom itself as an external storage by
>>>>>> storing information with a special task_id:
>>>>>> In the discussion thread you provided it was also suggested to use
>>>>>> Variables to store some persisting information.
>>>>>> These 3 approaches work but feel quite "hacky" and I believe that
>>>>>> providing such functionality would be good.
>>>>>> Finally, I don't see the point of limiting the functionality to such
>>>>>> extent, providing a "IdempotencyIdStorage" that only allows you to
store a
>>>>>> string
>>>>>> will just force people who need to store more than one id for one
>>>>>> task (for whatever reason) to use some hack again, like storing a
>>>>>> inside the storage.
>>>>>> I was more thinking about something quite similar to XCom (I liked
>>>>>> the XState name suggestion), where the entry would be keyed by "(dag_id,
>>>>>> task_id, execution_date, key)"
>>>>>> where "key" can be whatever you want and would be kept across retries.
>>>>>> I have read (quickly) through the "Pandora's Box" thread you linked.
>>>>>> Indeed it looks like there would be many ways to misuse such feature.
>>>>>> I do understand the important of idempotency, and it looks like my
>>>>>> use case is one of the first ever listed where I do need to persist
a state
>>>>>> across retries to make my operator really idempotent.
>>>>>> I'm surprised no one came up with it given how frequent the Spark
>>>>>> Airflow combination is (well, the BigQueryOperator was one too but
>>>>>> another solution).
>>>>>> Of course we can blame it on Livy for being poorly conceived (unlike
>>>>>> BigQuery) or we can blame it on Spark for not having a built-in security
>>>>>> mechanism to prevent double-writes,
>>>>>> but I think that as the above hacks show, you can't really prevent
>>>>>> users from shooting themselves in the foot if that's what they really
>>>>>> to.
>>>>>> While I do think that making things foolproof is important, I believe
>>>>>> it's also in Python's philosophy to *not* make things foolproof at
>>>>>> the detriment of simplicity for the right use cases.
>>>>>> But I do understand that the use cases are different and
>>>>>> contradictory: some would require the state to be persisted across
>>>>>> reschedule and not retries, mine would require the state to be persisted
>>>>>> across retries and not reschedule.
>>>>>> Maybe the Airflow-y way for that would be to have one task that does
>>>>>> the submit and an xcom with the job, then one task that check the
>>>>>> of the job, but that feels very cumbersome to double the number of
>>>>>> just for that. Plus I'm not sure we could make the first task retry
if the
>>>>>> second task fails...
>>>>>> Thanks again,
>>>>>> Furcy
>>>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <>
>>>>>> wrote:
>>>>>> I think we've discussed several approaches like that and using Xcom
>>>>>> name (which for many people would mean "let's just extend XCom table
>>>>>> that" is not a very good idea to use it IMHO. I think this is very
>>>>>> different functionality/logic which we might or might not agree to
>>>>>> implement as a community. Naming it "Xcom" to trying to extend the
>>>>>> table behavior might be problematic.
>>>>>> Not sure if you are aware but we had very similar discussion about
>>>>>> recently (without clear conclusions but at least you can see what
kind of
>>>>>> issues/problems different people have with this approach)
>>>>>> I am not saying it is impossible to do, but I think it's a matter
>>>>>> how we formulate the "use case". It's very tempting to implement
a generic
>>>>>> - intra-task communication mechanism, indeed. But it can very easily
>>>>>> to people abusing it and bypassing the guarantees (idempotency mainly)
>>>>>> Airflow provides for backfilling and re-running tasks. I thought
a bit
>>>>>> after the latest discussion kind of died out, and I have one possible
>>>>>> solution to the problem.
>>>>>> Let me explain what I think about it (but others can have different
>>>>>> opinions of course):
>>>>>> So far the discussion was that there are several ways to achieve
>>>>>> you want (and it's really about what entity is providing the "idempotency"
>>>>>> guarantee:
>>>>>> 1) Similarly as just merged in the BigQuery Insert Job
>>>>>> - you can provide
>>>>>> job_id from outside. You'd need to work out the job_id naming that
works in
>>>>>> your case and make sure that when you re-run your task with the same
>>>>>> (dag_id, task_id, execution date) you will get the same id. Then
>>>>>> "uniqueness" thus idempotency is handled by the logic written in
the DAG.
>>>>>> 2) Store the DAG id in some external storage (via one of the hooks
>>>>>> where it can be queried in the way that will work for you). Then
>>>>>> idempotency is actually handled by the logic in your Operator + some
>>>>>> external storage.
>>>>>> 3) Query your service and retrieve the JOB ID from it - but you have
>>>>>> to have a way to query for the job related to your "dag id  + task
>>>>>> + execution_date". Then - the idempotency is actually handling by
>>>>>> Service you are using.
>>>>>> In the use case, you describe - this is the only thing you need -
>>>>>> "idempotency source". I believe you would like to get the case 2)
>>>>>> above but without having to use external storage to store the "unique
>>>>>> Something that will let each task in the same dag run to set or retrieve
>>>>>> unique value for that particular task. One value should be enough
>>>>>> assuming that each operator/task works on one external data "source".
>>>>>> My current thinking is:
>>>>>> Why don't we provide such a dedicated, idempotency service inside
>>>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>>>>> class with two methods:
>>>>>>   * .set(id: str) and
>>>>>>   * .get() -> str
>>>>>> And the data stored there should be a string keyed by "dag_id,
>>>>>> task_id, execution_date)" - available also via Jinja templating.
There is
>>>>>> no intra-task communication, here, very little possibility of abuse
and it
>>>>>> seems to solve the major pain point where you have to provide your
>>>>>> storage to get the idempotency if your service does not provide one
or you
>>>>>> do not want to delegate it to the DAG writer.
>>>>>> J.
>>>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <>
>>>>>> The use case I'm referring to is that you can't use xcom to let a
>>>>>> task read information from it's past attempts, because when a task
>>>>>> it's xcom is automatically deleted.
>>>>>> My specific use case is that we have a custom LivyOperator that calls
>>>>>> Livy to start batch Spark Jobs.
>>>>>> When you start a batch job Livy returns a job_id
>>>>>> Sometimes our operator can fail for one reason or another (for
>>>>>> instance if Livy is unreachable for a while)
>>>>>> When the task retries, it calls Livy again, which start the same
>>>>>> spark job, but the problem is that the spark job from the first attempt
>>>>>> still be running,
>>>>>> and then we have a batch job that runs twice simultaneously and
>>>>>> creates duplicates in the output.
>>>>>> What we tried to do is getting the job_id from the first try, to
>>>>>> check if the job is still running, and wait for it to complete if
it is.
>>>>>> We tried using xcom to let the task send a message to itself (to
>>>>>> next try) but xcom is meant for "inter-task communication" only so
>>>>>> doesn't work and is not intended to work.
>>>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <>
>>>>>> wrote:
>>>>>> Hi Furcy,
>>>>>> Can you give a concrete example of what you mean by intra-task xcom?
>>>>>> Depending your use case this may already be possible.
>>>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <>
>>>>>> Hello,
>>>>>> I would like to open a feature request for Airflow to support
>>>>>> "intra-task xcom".
>>>>>> It seems that there are several distinct use cases for it already
>>>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>>>> I wanted to summarize links to the use cases and past attempts,
>>>>>> and the recommended approach (which apparently would be to create
>>>>>> a distinct feature from xcom to support this, it could be calle
>>>>>> intra-com or self-com ?)
>>>>>> Do you know if such ticket already exists? I couldn't find one.
>>>>>> Also I can't create any ticket due to some obscure bug (see my other
>>>>>> email).
>>>>>> Thanks,
>>>>>> Furcy
>>>>>> --
>>>>>> Jarek Potiuk
>>>>>> Polidea <> | Principal Software Engineer
>>>>>> M: +48 660 796 129 <+48660796129>
>>>>>> [image: Polidea] <>
>>> --
>>> Jarek Potiuk
>>> Polidea <> | Principal Software Engineer
>>> M: +48 660 796 129 <+48660796129>
>>> [image: Polidea] <>

View raw message