airflow-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniel Standish <>
Subject Re: Intra-task "xcom"
Date Wed, 22 Jul 2020 22:22:16 GMT
We are using state persistence pretty heavily right now with plugin models
that I have called ProcessState and TaskState

Our implementation might be too idiosyncratic to contribute to airflow, but
then again it might not.  I would be happy to do a call to demo what we are
doing to see if there is any interest, and to receive guidance from
interested parties re what if anything might make sense in airflow.  Kaxil
do you have any interest in that?

On Thu, Jun 4, 2020 at 4:49 PM Kaxil Naik <> wrote:

> I definitely feel we can support this uses-cases by improving XCom. The
> concept of XCom was to allow sharing messages & state between tasks.
> Here is the first line from the docs about Xcom:
> XComs let tasks exchange messages, allowing more nuanced forms of control
> and shared state. The name is an abbreviation of “cross-communication”.
> I read the AIP (
> ) from @Daniel Standish <> , the "namespacing" of
> this state would be a good feature,
> XCom already allows that with "dag_id, task_id" key. *Solution 1* in the
> AIP would solve the issue without much impact and whilst
> maintaining backwards-compatiblilty.
> I am against the idea of using Secrets Backend for storing "State".
> Storing state for some kind of persistence should be "short-lived" and
> temporary.
> The "writers" & "readers" of both (Secrets & State) are different.
> Generally, Sysadmins / Teamleads are responsible for managing secrets
> (writing, rotating, auditing) etc whereas for State it is written via
> Airflow Workers  and would (or should) be short-lived and you don't care by
> auditing or rotating the value in "State".
> The only problem that I can see in the current XCom implementation is 1)
> the use of execution_date and the fact that 2) XCom are cleared at the
> start.
> One of the issue we already want to address in Airflow is to remove the
> hard-requirement of "execution_date" for DagRun and TaskInstance. This
> would also help in fixing (1) above.
> (2) can be solved by a flag as mentioned in the AIP.
> Regards,
> Kaxil
> On Tue, Jun 2, 2020 at 8:05 AM Jarek Potiuk <>
> wrote:
>> I think this subject came so often, that I also change my mind slowly in
>> favor of making an explicit state persistence "service".
>> Whether it's only one key or more, it's secondary, I think but if users
>> are already using Variables to keep state for tasks - this is a clear sign
>> that we miss a crucial feature and our users are abusing Airflow already in
>> the way we try to prevent by not introducing "State service".
>> With the recent SecretBackend implementation where Variables might be
>> kept in a Secret backend - not only MetaStore - potentially you might have
>> no write access to the backend. There is even no "write" support in the
>> current "MetastoreBackend" implementation for writing variables. So we
>> already have configurations where if we try to write variables and read it
>> elsewhere might not work - as far as I can see. You can set several
>> backends of course and the Metastore as the last fallback of course, but
>> for me, it opens up different problems - what happens if the key is present
>> in both, tasks writes it to metastore, but another task reads it from the
>> Secret Backend.
>> I think it seems that variables are being abused in exactly the way we
>> want to prevent the "StateService" to be abused - and shying away from that
>> is really like closing our eyes and pretending it's not happening.
>> So maybe we can make a change AIP with this approach:
>> 1) Variables -> mostly read-only (for tasks)  and used to keep
>> configuration shared between workers (but not on a task level).
>> 2) StateService (or wherever we call it) where we keep state information
>> for specific dag + task + execution_date.
>> J.
>> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <>
>> wrote:
>>> Airflow already provides a mechanism for state persistence: the
>>> Variable, and, with caveats and flaws, XCom.
>>> I personally persist state to the airflow metastore database for a large
>>> percentage of our jobs.  They are incremental jobs and it is helpful to
>>> keep track of watermark.
>>> I think that incremental jobs are probably very very common in airflow
>>> implementations.  Though probably often times users resort to imperfect
>>> vehicles for this such as `execution_date` or xcom.
>>> I have a very draftey draft aip that i haven't had enough time to work
>>> on, which explores adding explicit support for state persistence to
>>> airflow:
>>> Though I understand it is a controversial idea.  (note: The AIP is not
>>> ready for primetime.)
>>> I am of the mind that being able to persist some state is not a
>>> fundamental change to airflow and would just add explicit (and more
>>> user-friendly) support for something that is already quite common, and fits
>>> fully within the wheelhouse of what airflow exists to do.
>>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <> wrote:
>>>> Furcy,
>>>> To clarify, when I say that Airflow should not be in the business of
>>>> keeping state about external systems, I specifically mean it shouldn't be
>>>> keeping state to be shared between task instances. I completely understand
>>>> that there may be external systems that are harder to work with, and like
>>>> in your case require the operator to be able to store some piece of
>>>> information to make them idempotent. I just don't think that Airflow should
>>>> provide that storage mechanism.
>>>> I would think that most users of Airflow have access to some sort of
>>>> cloud storage like S3 (which are really just key-value stores), and it's
>>>> easy enough to write your job_id or whatever value you care about to a file
>>>> with a prefix computed from the dag_id, task_id, execution_date or whatever
>>>> combination of them you care about. Yes it makes your operators more
>>>> complex and they have to know about another system, but it keeps that
>>>> complexity out of core Airflow. That's the trade off.
>>>> Ash,
>>>> I'm not suggesting that XCom be removed from Airflow, and I understand
>>>> there are use cases where it makes some things convenient. In your example
>>>> though, it would be just as easy for the sensor to write the found object
>>>> path as the contents of another file in S3, with a computable prefix based
>>>> on the dag/task/execution_date.
>>>> At its heart XCom is just a key-value store where the keys are limited
>>>> to a very specific set of possibilities, and where key-value pairs are
>>>> managed in some specific ways. The request here is to add another narrowly
>>>> defined set of allowable keys, and as far as I can tell with no extra
>>>> management of them. The only real advantage of using the Airflow database
>>>> for XCom or any expansion/variation on it is that we know that all
>>>> operators have access to the database.
>>>> I'm not an expert but I would wonder how well Postgres or MySQL perform
>>>> as high volume key value stores. Does anyone actually use XCom at scale,
>>>> and does that extra load on the database impact scheduling and other
>>>> performance aspects of Airflow?
>>>> Chris
>>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <>
>>>> wrote:
>>>>> Just to touch on one point about XCom, and to re-assure people that
>>>>> they, or something very like them are in Airflow for the foreseeable
>>>>> As an example of an appropriate use for XCom: Let's say a third party
>>>>> delivers you a set of files once a week, but the exact name of the files
>>>>> isn't known (by you) in advance. So you write a sensor that polls/checks
>>>>> for the Objects to appear in our bucket, and the sensor outputs the S3
>>>>> Object path to XCom, that then next processing step then examines to
>>>>> process the files.
>>>>> That sort of use case is not going anywhere.
>>>>> Cheers,
>>>>> -ash
>>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <> wrote:
>>>>> At the risk of repeating myself (from the previous thread that touched
>>>>> on this topic), I don't think Airflow should be in the business of keeping
>>>>> state about external systems. Airflow is about authoring and running
>>>>> workflows; it's not a messaging tool or a cluster management tool. I'm
>>>>> convinced that the existing XCom functionality should really be a part
>>>>> Airflow, and I certainly don't think it should be expanded upon or new
>>>>> variations added. I think storing state is especially risky, if for no
>>>>> other reason than the fact that Airflow is not the source of truth about
>>>>> those systems. It's very likely that at some times the "state" that Airflow
>>>>> has saved will diverge from the actual state of the external system.
>>>>> Handling that nicely, probably requires a bunch of custom code in the
>>>>> operators/hooks anyway, so I don't think it saves anything in terms of
>>>>> operator code complexity. Users would be much better served going to
>>>>> source of truth to determine state. If part of the problem is that Livy
>>>>> lacking in features (like being able to query the status of a particular
>>>>> job_id) then I think it would be more appropriate to add the needed
>>>>> features to that project. Airflow at its core shouldn't be concerned
>>>>> making up for failures of other tools.
>>>>> Also as can be seen by just this discussion, it's hard to keep these
>>>>> extra features from expanding in scope. Jarek proposed something that
>>>>> just store a single string, and immediately Furcy wants to expand it
>>>>> store multiple strings. Either way we are really just talking about a
>>>>> key-value store, and putting limits on how that key can be structured;
>>>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's
>>>>> proposal).
>>>>> I know in the past that I had a situation where I wanted to reuse a
>>>>> cluster across multiple data intervals, if one was already running (this
>>>>> was before I discovered Airflow so wasn't "execution dates" precisely).
>>>>> can equally see use cases where I might want to share some resource for
>>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So
if we
>>>>> added this then why limit it to any one of those combinations? But then
>>>>> just have an arbitrary key-value store. If you want to use Airflow for
>>>>> then you can use Variables, if you want to use something else then you
>>>>> Unless Airflow is doing some extra management of these key-values in
>>>>> some way (like it does with clearing out XCom's on reruns), then I see
>>>>> absolutely no added benefit. And even with some potential management
>>>>> Airflow I'm still not convinced that Airflow is the right place for it.
>>>>> Chris
>>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <>
>>>>> Thank you Jarek for the detailed explanation,
>>>>> That's exactly what I wanted to do: write a feature request to
>>>>> summarize all those discussions.
>>>>> I agree with you that the feature should be marked distinct from the
>>>>> XCom feature and that we should not piggyback this feature into XCom.
>>>>> The crux of the problem, I think is that with XCom you do want the
>>>>> task to delete it's xcom on the beginning of the retry.
>>>>> Correct me if I'm wrong but one use cases where it was necessary was
>>>>> having a task A and a task B that starts immediately after A, and wait
>>>>> some 'signal' from A.
>>>>> If A and B restart and A doesn't reset it's signal, then B will use
>>>>> the signal from A's first try, which is incorrect.
>>>>> About the 3 solutions you mention:
>>>>> 1) Providing the job_id from outside. That works indeed. Sadly in my
>>>>> use-case Livy's API is poorly designed and only returns a generated job_id,
>>>>> you can't specify a custom one.
>>>>> You can't even find a job by name, I would have to list all the active
>>>>> job_ids, and do a GET for each of them to get it's name and find which
>>>>> is the one I want. It's doable but inelegant.
>>>>> 2) Store the id in an external storage. Of course it would work but it
>>>>> requires an external storage. More on that below.
>>>>> 3) I'm not sure I understand completely what you mean there, but I
>>>>> think you mean that the idempotency can be handled by the service you
>>>>> (for instance BigQuery). Indeed that is another solution. If we were
>>>>> Spark with a Hive metastore + locking or the deltalake storage format,
>>>>> could have something to prevent a job that run twice from creating
>>>>> duplicates. This is another solution we are considering, but it is
>>>>> coslty to change now.
>>>>> You guess correctly that the feature I was asking for me would be to
>>>>> provide some utility to let the users implement solution 2) without
>>>>> requiring an external storage.
>>>>> I think it would be a QOL improvement for some use cases, just like it
>>>>> could be argued that XCom is just a QOL improvement and users could have
>>>>> used an external storage themselves.
>>>>> The main advantage that it brings is making the custom operators much
>>>>> easier to share and reuse across the Apache Airflow community, compared
>>>>> having to set up some external
>>>>> storage.
>>>>> I have seen that some users used the metadata store itself as an
>>>>> external storage by adding a new table to the airflow model:
>>>>> And others suggested using XCom itself as an external storage by
>>>>> storing information with a special task_id:
>>>>> In the discussion thread you provided it was also suggested to use
>>>>> Variables to store some persisting information.
>>>>> These 3 approaches work but feel quite "hacky" and I believe that
>>>>> providing such functionality would be good.
>>>>> Finally, I don't see the point of limiting the functionality to such
>>>>> extent, providing a "IdempotencyIdStorage" that only allows you to store
>>>>> string
>>>>> will just force people who need to store more than one id for one task
>>>>> (for whatever reason) to use some hack again, like storing a json inside
>>>>> the storage.
>>>>> I was more thinking about something quite similar to XCom (I liked the
>>>>> XState name suggestion), where the entry would be keyed by "(dag_id,
>>>>> task_id, execution_date, key)"
>>>>> where "key" can be whatever you want and would be kept across retries.
>>>>> I have read (quickly) through the "Pandora's Box" thread you linked.
>>>>> Indeed it looks like there would be many ways to misuse such feature.
>>>>> I do understand the important of idempotency, and it looks like my use
>>>>> case is one of the first ever listed where I do need to persist a state
>>>>> across retries to make my operator really idempotent.
>>>>> I'm surprised no one came up with it given how frequent the Spark +
>>>>> Airflow combination is (well, the BigQueryOperator was one too but found
>>>>> another solution).
>>>>> Of course we can blame it on Livy for being poorly conceived (unlike
>>>>> BigQuery) or we can blame it on Spark for not having a built-in security
>>>>> mechanism to prevent double-writes,
>>>>> but I think that as the above hacks show, you can't really prevent
>>>>> users from shooting themselves in the foot if that's what they really
>>>>> to.
>>>>> While I do think that making things foolproof is important, I believe
>>>>> it's also in Python's philosophy to *not* make things foolproof at
>>>>> the detriment of simplicity for the right use cases.
>>>>> But I do understand that the use cases are different and
>>>>> contradictory: some would require the state to be persisted across
>>>>> reschedule and not retries, mine would require the state to be persisted
>>>>> across retries and not reschedule.
>>>>> Maybe the Airflow-y way for that would be to have one task that does
>>>>> the submit and an xcom with the job, then one task that check the progress
>>>>> of the job, but that feels very cumbersome to double the number of tasks
>>>>> just for that. Plus I'm not sure we could make the first task retry if
>>>>> second task fails...
>>>>> Thanks again,
>>>>> Furcy
>>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <>
>>>>> wrote:
>>>>> I think we've discussed several approaches like that and using Xcom
>>>>> name (which for many people would mean "let's just extend XCom table
>>>>> that" is not a very good idea to use it IMHO. I think this is very
>>>>> different functionality/logic which we might or might not agree to
>>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>>>> table behavior might be problematic.
>>>>> Not sure if you are aware but we had very similar discussion about it
>>>>> recently (without clear conclusions but at least you can see what kind
>>>>> issues/problems different people have with this approach)
>>>>> I am not saying it is impossible to do, but I think it's a matter of
>>>>> how we formulate the "use case". It's very tempting to implement a generic
>>>>> - intra-task communication mechanism, indeed. But it can very easily
>>>>> to people abusing it and bypassing the guarantees (idempotency mainly)
>>>>> Airflow provides for backfilling and re-running tasks. I thought a bit
>>>>> after the latest discussion kind of died out, and I have one possible
>>>>> solution to the problem.
>>>>> Let me explain what I think about it (but others can have different
>>>>> opinions of course):
>>>>> So far the discussion was that there are several ways to achieve what
>>>>> you want (and it's really about what entity is providing the "idempotency"
>>>>> guarantee:
>>>>> 1) Similarly as just merged in the BigQuery Insert Job
>>>>> - you can provide
>>>>> job_id from outside. You'd need to work out the job_id naming that works
>>>>> your case and make sure that when you re-run your task with the same
>>>>> (dag_id, task_id, execution date) you will get the same id. Then the
>>>>> "uniqueness" thus idempotency is handled by the logic written in the
>>>>> 2) Store the DAG id in some external storage (via one of the hooks -
>>>>> where it can be queried in the way that will work for you). Then the
>>>>> idempotency is actually handled by the logic in your Operator + some
>>>>> external storage.
>>>>> 3) Query your service and retrieve the JOB ID from it - but you have
>>>>> to have a way to query for the job related to your "dag id  + task
>>>>> + execution_date". Then - the idempotency is actually handling by the
>>>>> Service you are using.
>>>>> In the use case, you describe - this is the only thing you need -
>>>>> "idempotency source". I believe you would like to get the case 2) from
>>>>> above but without having to use external storage to store the "unique
>>>>> Something that will let each task in the same dag run to set or retrieve
>>>>> unique value for that particular task. One value should be enough -
>>>>> assuming that each operator/task works on one external data "source".
>>>>> My current thinking is:
>>>>> Why don't we provide such a dedicated, idempotency service inside
>>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage"
>>>>> class with two methods:
>>>>>   * .set(id: str) and
>>>>>   * .get() -> str
>>>>> And the data stored there should be a string keyed by "dag_id,
>>>>> task_id, execution_date)" - available also via Jinja templating. There
>>>>> no intra-task communication, here, very little possibility of abuse and
>>>>> seems to solve the major pain point where you have to provide your own
>>>>> storage to get the idempotency if your service does not provide one or
>>>>> do not want to delegate it to the DAG writer.
>>>>> J.
>>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <>
>>>>> The use case I'm referring to is that you can't use xcom to let a task
>>>>> read information from it's past attempts, because when a task starts
>>>>> xcom is automatically deleted.
>>>>> My specific use case is that we have a custom LivyOperator that calls
>>>>> Livy to start batch Spark Jobs.
>>>>> When you start a batch job Livy returns a job_id
>>>>> Sometimes our operator can fail for one reason or another (for
>>>>> instance if Livy is unreachable for a while)
>>>>> When the task retries, it calls Livy again, which start the same spark
>>>>> job, but the problem is that the spark job from the first attempt can
>>>>> be running,
>>>>> and then we have a batch job that runs twice simultaneously and
>>>>> creates duplicates in the output.
>>>>> What we tried to do is getting the job_id from the first try, to check
>>>>> if the job is still running, and wait for it to complete if it is.
>>>>> We tried using xcom to let the task send a message to itself (to it's
>>>>> next try) but xcom is meant for "inter-task communication" only so this
>>>>> doesn't work and is not intended to work.
>>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <>
>>>>> Hi Furcy,
>>>>> Can you give a concrete example of what you mean by intra-task xcom?
>>>>> Depending your use case this may already be possible.
>>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <> wrote:
>>>>> Hello,
>>>>> I would like to open a feature request for Airflow to support
>>>>> "intra-task xcom".
>>>>> It seems that there are several distinct use cases for it already
>>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>>>> I wanted to summarize links to the use cases and past attempts,
>>>>> and the recommended approach (which apparently would be to create
>>>>> a distinct feature from xcom to support this, it could be calle
>>>>> intra-com or self-com ?)
>>>>> Do you know if such ticket already exists? I couldn't find one.
>>>>> Also I can't create any ticket due to some obscure bug (see my other
>>>>> email).
>>>>> Thanks,
>>>>> Furcy
>>>>> --
>>>>> Jarek Potiuk
>>>>> Polidea <> | Principal Software Engineer
>>>>> M: +48 660 796 129 <+48660796129>
>>>>> [image: Polidea] <>
>> --
>> Jarek Potiuk
>> Polidea <> | Principal Software Engineer
>> M: +48 660 796 129 <+48660796129>
>> [image: Polidea] <>

View raw message