From users-return-418-archive-asf-public=cust-asf.ponee.io@airflow.apache.org Thu Jul 23 07:37:11 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mailroute1-lw-us.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 0A1D7180642 for ; Thu, 23 Jul 2020 09:37:11 +0200 (CEST) Received: from mail.apache.org (localhost [127.0.0.1]) by mailroute1-lw-us.apache.org (ASF Mail Server at mailroute1-lw-us.apache.org) with SMTP id BED071259B8 for ; Thu, 23 Jul 2020 07:37:08 +0000 (UTC) Received: (qmail 29967 invoked by uid 500); 23 Jul 2020 07:37:08 -0000 Mailing-List: contact users-help@airflow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@airflow.apache.org Delivered-To: mailing list users@airflow.apache.org Received: (qmail 29953 invoked by uid 99); 23 Jul 2020 07:37:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Jul 2020 07:37:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id E28EA18147F for ; Thu, 23 Jul 2020 07:37:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.002 X-Spam-Level: X-Spam-Status: No, score=0.002 tagged_above=-999 required=6.31 tests=[AC_DIV_BONANZA=0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=polidea.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 8MerJE9-lIsI for ; Thu, 23 Jul 2020 07:37:00 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2a00:1450:4864:20::536; helo=mail-ed1-x536.google.com; envelope-from=jarek.potiuk@polidea.com; receiver= Received: from mail-ed1-x536.google.com (mail-ed1-x536.google.com [IPv6:2a00:1450:4864:20::536]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 4F1BB7F73F for ; Thu, 23 Jul 2020 07:36:59 +0000 (UTC) Received: by mail-ed1-x536.google.com with SMTP id q4so539000edv.13 for ; Thu, 23 Jul 2020 00:36:59 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=polidea.com; s=google; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=eBAW56zDPPr+RLInI2CBCDtv5FBfc0PYHKSREQtcrI0=; b=Zcd9nyswP8g0Mcgblvx8YzIPGg+jYjFglN2kGkqtl0I6u5h//bbvKC7gLzweIwo2LU PdtRWy7XduBLKTxytwlCWC6UfUK93+1JQQR5cAI+IQMm8xWfdyt8qNccyDH22A0nR4tX Bec+Famj9jZ/gHvWP8fzYemO24C7YSe8I2ny4= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=eBAW56zDPPr+RLInI2CBCDtv5FBfc0PYHKSREQtcrI0=; b=SkVSO/stNfAZ0pa7Syl/WA4ckuddgrYRHpyF0+GZNZJrdgLgAUK/D3VlT40st1Hs4b v+9jgFFqByaAFOGchPNMpYF/hZ9CD1hfnfmLJez+Wk2rxLtd4Uqww6jvNVyAwm3kQXof FIMHjOHZXRagohPPx4Q2uCKE4z1412amthur1ekYR8zCf9nwe0y6IVq7bKGj5n3IAyU4 +q5ZeZNvBenGisTtFKrvJsGCTAEpRqNa3SRTIxhh7T8Tnokm3kHif7TSEjWYSBvUq5CM tOQ6sXAvfGQQe6WjWa4UCBK3fP4n+DovCWDQeeJu9meFCF8mSUpA34oh6kzARwI5IMFF mE1w== X-Gm-Message-State: AOAM530+J2coG+bh7I5wTv0cavVPPrwGq+Fd/nO+PG3VkkTYyQdGf1nz uAWZNioXy5kxTX4fwRMvTTZF45d6rmtzLWvte573LauO X-Google-Smtp-Source: ABdhPJw2p9u45GFNnYifm1GfvIb61Oj2e2vtaQ/BwMPaMw0UR1VM0RrIWcidp5BnbNn0e8qN0LlcQCfHqrhcLvs/iEs= X-Received: by 2002:a05:6402:1687:: with SMTP id a7mr2923900edv.358.1595489812810; Thu, 23 Jul 2020 00:36:52 -0700 (PDT) MIME-Version: 1.0 References: <0892A971-7793-45F1-9A04-7304CED81C2A@getmailspring.com> In-Reply-To: From: Jarek Potiuk Date: Thu, 23 Jul 2020 09:36:41 +0200 Message-ID: Subject: Re: Intra-task "xcom" To: users@airflow.apache.org Content-Type: multipart/alternative; boundary="000000000000891f0e05ab16ebeb" --000000000000891f0e05ab16ebeb Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable I'd love to take part - but I am going for week of vacations (finally!) next week, so week after next would be great! On Thu, Jul 23, 2020 at 12:23 AM Daniel Standish wrote: > We are using state persistence pretty heavily right now with plugin model= s > that I have called ProcessState and TaskState > > Our implementation might be too idiosyncratic to contribute to airflow, > but then again it might not. I would be happy to do a call to demo what = we > are doing to see if there is any interest, and to receive guidance from > interested parties re what if anything might make sense in airflow. Kaxi= l > do you have any interest in that? > > On Thu, Jun 4, 2020 at 4:49 PM Kaxil Naik wrote: > >> I definitely feel we can support this uses-cases by improving XCom. The >> concept of XCom was to allow sharing messages & state between tasks. >> >> Here is the first line from the docs about Xcom: >> >> XComs let tasks exchange messages, allowing more nuanced forms of contro= l >> and shared state. The name is an abbreviation of =E2=80=9Ccross-communic= ation=E2=80=9D. >> >> I read the AIP ( >> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+pers= istence >> ) from @Daniel Standish , the "namespacing" of >> this state would be a good feature, >> XCom already allows that with "dag_id, task_id" key. *Solution 1* in the >> AIP would solve the issue without much impact and whilst >> maintaining backwards-compatiblilty. >> >> I am against the idea of using Secrets Backend for storing "State". >> Storing state for some kind of persistence should be "short-lived" and >> temporary. >> >> The "writers" & "readers" of both (Secrets & State) are different. >> Generally, Sysadmins / Teamleads are responsible for managing secrets >> (writing, rotating, auditing) etc whereas for State it is written via >> Airflow Workers and would (or should) be short-lived and you don't care= by >> auditing or rotating the value in "State". >> >> The only problem that I can see in the current XCom implementation is 1) >> the use of execution_date and the fact that 2) XCom are cleared at the >> start. >> >> One of the issue we already want to address in Airflow is to remove the >> hard-requirement of "execution_date" for DagRun and TaskInstance. This >> would also help in fixing (1) above. >> >> (2) can be solved by a flag as mentioned in the AIP. >> >> Regards, >> Kaxil >> >> >> On Tue, Jun 2, 2020 at 8:05 AM Jarek Potiuk >> wrote: >> >>> I think this subject came so often, that I also change my mind slowly i= n >>> favor of making an explicit state persistence "service". >>> >>> Whether it's only one key or more, it's secondary, I think but if users >>> are already using Variables to keep state for tasks - this is a clear s= ign >>> that we miss a crucial feature and our users are abusing Airflow alread= y in >>> the way we try to prevent by not introducing "State service". >>> >>> With the recent SecretBackend implementation where Variables might be >>> kept in a Secret backend - not only MetaStore - potentially you might h= ave >>> no write access to the backend. There is even no "write" support in the >>> current "MetastoreBackend" implementation for writing variables. So we >>> already have configurations where if we try to write variables and read= it >>> elsewhere might not work - as far as I can see. You can set several >>> backends of course and the Metastore as the last fallback of course, bu= t >>> for me, it opens up different problems - what happens if the key is pre= sent >>> in both, tasks writes it to metastore, but another task reads it from t= he >>> Secret Backend. >>> >>> I think it seems that variables are being abused in exactly the way we >>> want to prevent the "StateService" to be abused - and shying away from = that >>> is really like closing our eyes and pretending it's not happening. >>> >>> So maybe we can make a change AIP with this approach: >>> >>> 1) Variables -> mostly read-only (for tasks) and used to keep >>> configuration shared between workers (but not on a task level). >>> 2) StateService (or wherever we call it) where we keep state informatio= n >>> for specific dag + task + execution_date. >>> >>> J. >>> >>> >>> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish >>> wrote: >>> >>>> Airflow already provides a mechanism for state persistence: the >>>> Variable, and, with caveats and flaws, XCom. >>>> >>>> I personally persist state to the airflow metastore database for a >>>> large percentage of our jobs. They are incremental jobs and it is hel= pful >>>> to keep track of watermark. >>>> >>>> I think that incremental jobs are probably very very common in airflow >>>> implementations. Though probably often times users resort to imperfec= t >>>> vehicles for this such as `execution_date` or xcom. >>>> >>>> I have a very draftey draft aip that i haven't had enough time to work >>>> on, which explores adding explicit support for state persistence to >>>> airflow: >>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+pe= rsistence. >>>> Though I understand it is a controversial idea. (note: The AIP is not >>>> ready for primetime.) >>>> >>>> I am of the mind that being able to persist some state is not a >>>> fundamental change to airflow and would just add explicit (and more >>>> user-friendly) support for something that is already quite common, and= fits >>>> fully within the wheelhouse of what airflow exists to do. >>>> >>>> >>>> >>>> >>>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer wrote= : >>>> >>>>> Furcy, >>>>> >>>>> To clarify, when I say that Airflow should not be in the business of >>>>> keeping state about external systems, I specifically mean it shouldn'= t be >>>>> keeping state to be shared between task instances. I completely under= stand >>>>> that there may be external systems that are harder to work with, and = like >>>>> in your case require the operator to be able to store some piece of >>>>> information to make them idempotent. I just don't think that Airflow = should >>>>> provide that storage mechanism. >>>>> >>>>> I would think that most users of Airflow have access to some sort of >>>>> cloud storage like S3 (which are really just key-value stores), and i= t's >>>>> easy enough to write your job_id or whatever value you care about to = a file >>>>> with a prefix computed from the dag_id, task_id, execution_date or wh= atever >>>>> combination of them you care about. Yes it makes your operators more >>>>> complex and they have to know about another system, but it keeps that >>>>> complexity out of core Airflow. That's the trade off. >>>>> >>>>> Ash, >>>>> I'm not suggesting that XCom be removed from Airflow, and I understan= d >>>>> there are use cases where it makes some things convenient. In your ex= ample >>>>> though, it would be just as easy for the sensor to write the found ob= ject >>>>> path as the contents of another file in S3, with a computable prefix = based >>>>> on the dag/task/execution_date. >>>>> >>>>> >>>>> At its heart XCom is just a key-value store where the keys are limite= d >>>>> to a very specific set of possibilities, and where key-value pairs ar= e >>>>> managed in some specific ways. The request here is to add another nar= rowly >>>>> defined set of allowable keys, and as far as I can tell with no extra >>>>> management of them. The only real advantage of using the Airflow data= base >>>>> for XCom or any expansion/variation on it is that we know that all >>>>> operators have access to the database. >>>>> >>>>> I'm not an expert but I would wonder how well Postgres or MySQL >>>>> perform as high volume key value stores. Does anyone actually use XCo= m at >>>>> scale, and does that extra load on the database impact scheduling and= other >>>>> performance aspects of Airflow? >>>>> >>>>> Chris >>>>> >>>>> >>>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor >>>>> wrote: >>>>> >>>>>> Just to touch on one point about XCom, and to re-assure people that >>>>>> they, or something very like them are in Airflow for the foreseeable= future. >>>>>> >>>>>> As an example of an appropriate use for XCom: Let's say a third >>>>>> party delivers you a set of files once a week, but the exact name of= the >>>>>> files isn't known (by you) in advance. So you write a sensor that >>>>>> polls/checks S3 for the Objects to appear in our bucket, and the sen= sor >>>>>> outputs the S3 Object path to XCom, that then next processing step t= hen >>>>>> examines to process the files. >>>>>> >>>>>> That sort of use case is not going anywhere. >>>>>> >>>>>> Cheers, >>>>>> -ash >>>>>> >>>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer wrote: >>>>>> >>>>>> At the risk of repeating myself (from the previous thread that >>>>>> touched on this topic), I don't think Airflow should be in the busin= ess of >>>>>> keeping state about external systems. Airflow is about authoring and >>>>>> running workflows; it's not a messaging tool or a cluster management= tool. >>>>>> I'm not convinced that the existing XCom functionality should really= be a >>>>>> part of Airflow, and I certainly don't think it should be expanded u= pon or >>>>>> new variations added. I think storing state is especially risky, if = for no >>>>>> other reason than the fact that Airflow is not the source of truth a= bout >>>>>> those systems. It's very likely that at some times the "state" that = Airflow >>>>>> has saved will diverge from the actual state of the external system. >>>>>> Handling that nicely, probably requires a bunch of custom code in th= e >>>>>> operators/hooks anyway, so I don't think it saves anything in terms = of >>>>>> operator code complexity. Users would be much better served going to= the >>>>>> source of truth to determine state. If part of the problem is that L= ivy is >>>>>> lacking in features (like being able to query the status of a partic= ular >>>>>> job_id) then I think it would be more appropriate to add the needed >>>>>> features to that project. Airflow at its core shouldn't be concerned= with >>>>>> making up for failures of other tools. >>>>>> >>>>>> Also as can be seen by just this discussion, it's hard to keep these >>>>>> extra features from expanding in scope. Jarek proposed something tha= t would >>>>>> just store a single string, and immediately Furcy wants to expand it= to >>>>>> store multiple strings. Either way we are really just talking about = a >>>>>> key-value store, and putting limits on how that key can be structure= d; the >>>>>> key is made up of some predefined set of Airflow entities (for Jarek= 's >>>>>> proposal) or some arbitrary key along with those Airflow entities (F= urcy's >>>>>> proposal). >>>>>> >>>>>> I know in the past that I had a situation where I wanted to reuse a >>>>>> cluster across multiple data intervals, if one was already running (= this >>>>>> was before I discovered Airflow so wasn't "execution dates" precisel= y). I >>>>>> can equally see use cases where I might want to share some resource = for >>>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. S= o if we >>>>>> added this then why limit it to any one of those combinations? But t= hen we >>>>>> just have an arbitrary key-value store. If you want to use Airflow f= or that >>>>>> then you can use Variables, if you want to use something else then y= ou can. >>>>>> >>>>>> Unless Airflow is doing some extra management of these key-values in >>>>>> some way (like it does with clearing out XCom's on reruns), then I s= ee >>>>>> absolutely no added benefit. And even with some potential management= by >>>>>> Airflow I'm still not convinced that Airflow is the right place for = it. >>>>>> >>>>>> Chris >>>>>> >>>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin wrote= : >>>>>> >>>>>> Thank you Jarek for the detailed explanation, >>>>>> >>>>>> That's exactly what I wanted to do: write a feature request to >>>>>> summarize all those discussions. >>>>>> I agree with you that the feature should be marked distinct from the >>>>>> XCom feature and that we should not piggyback this feature into XCom= . >>>>>> >>>>>> The crux of the problem, I think is that with XCom you do want the >>>>>> task to delete it's xcom on the beginning of the retry. >>>>>> Correct me if I'm wrong but one use cases where it was necessary was >>>>>> having a task A and a task B that starts immediately after A, and wa= it from >>>>>> some 'signal' from A. >>>>>> If A and B restart and A doesn't reset it's signal, then B will use >>>>>> the signal from A's first try, which is incorrect. >>>>>> >>>>>> About the 3 solutions you mention: >>>>>> >>>>>> 1) Providing the job_id from outside. That works indeed. Sadly in my >>>>>> use-case Livy's API is poorly designed and only returns a generated = job_id, >>>>>> you can't specify a custom one. >>>>>> You can't even find a job by name, I would have to list all the >>>>>> active job_ids, and do a GET for each of them to get it's name and f= ind >>>>>> which one is the one I want. It's doable but inelegant. >>>>>> >>>>>> 2) Store the id in an external storage. Of course it would work but >>>>>> it requires an external storage. More on that below. >>>>>> >>>>>> 3) I'm not sure I understand completely what you mean there, but I >>>>>> think you mean that the idempotency can be handled by the service yo= u call >>>>>> (for instance BigQuery). Indeed that is another solution. If we were= using >>>>>> Spark with a Hive metastore + locking or the deltalake storage forma= t, we >>>>>> could have something to prevent a job that run twice from creating >>>>>> duplicates. This is another solution we are considering, but it is >>>>>> coslty to change now. >>>>>> >>>>>> You guess correctly that the feature I was asking for me would be to >>>>>> provide some utility to let the users implement solution 2) without >>>>>> requiring an external storage. >>>>>> I think it would be a QOL improvement for some use cases, just like >>>>>> it could be argued that XCom is just a QOL improvement and users cou= ld have >>>>>> used an external storage themselves. >>>>>> The main advantage that it brings is making the custom operators muc= h >>>>>> easier to share and reuse across the Apache Airflow community, compa= red to >>>>>> having to set up some external >>>>>> storage. >>>>>> >>>>>> I have seen that some users used the metadata store itself as an >>>>>> external storage by adding a new table to the airflow model: >>>>>> >>>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3c= CAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail.com%3e >>>>>> >>>>>> And others suggested using XCom itself as an external storage by >>>>>> storing information with a special task_id: >>>>>> https://stackoverflow.com/a/57515143/2087478 >>>>>> >>>>>> In the discussion thread you provided it was also suggested to use >>>>>> Variables to store some persisting information. >>>>>> >>>>>> These 3 approaches work but feel quite "hacky" and I believe that >>>>>> providing such functionality would be good. >>>>>> >>>>>> Finally, I don't see the point of limiting the functionality to such >>>>>> extent, providing a "IdempotencyIdStorage" that only allows you to s= tore a >>>>>> string >>>>>> will just force people who need to store more than one id for one >>>>>> task (for whatever reason) to use some hack again, like storing a js= on >>>>>> inside the storage. >>>>>> >>>>>> I was more thinking about something quite similar to XCom (I liked >>>>>> the XState name suggestion), where the entry would be keyed by "(dag= _id, >>>>>> task_id, execution_date, key)" >>>>>> where "key" can be whatever you want and would be kept across retrie= s. >>>>>> >>>>>> I have read (quickly) through the "Pandora's Box" thread you linked. >>>>>> Indeed it looks like there would be many ways to misuse such feature= . >>>>>> I do understand the important of idempotency, and it looks like my >>>>>> use case is one of the first ever listed where I do need to persist = a state >>>>>> across retries to make my operator really idempotent. >>>>>> >>>>>> I'm surprised no one came up with it given how frequent the Spark + >>>>>> Airflow combination is (well, the BigQueryOperator was one too but f= ound >>>>>> another solution). >>>>>> >>>>>> Of course we can blame it on Livy for being poorly conceived (unlike >>>>>> BigQuery) or we can blame it on Spark for not having a built-in secu= rity >>>>>> mechanism to prevent double-writes, >>>>>> but I think that as the above hacks show, you can't really prevent >>>>>> users from shooting themselves in the foot if that's what they reall= y want >>>>>> to. >>>>>> >>>>>> While I do think that making things foolproof is important, I believ= e >>>>>> it's also in Python's philosophy to *not* make things foolproof at >>>>>> the detriment of simplicity for the right use cases. >>>>>> But I do understand that the use cases are different and >>>>>> contradictory: some would require the state to be persisted across >>>>>> reschedule and not retries, mine would require the state to be persi= sted >>>>>> across retries and not reschedule. >>>>>> >>>>>> Maybe the Airflow-y way for that would be to have one task that does >>>>>> the submit and an xcom with the job, then one task that check the pr= ogress >>>>>> of the job, but that feels very cumbersome to double the number of t= asks >>>>>> just for that. Plus I'm not sure we could make the first task retry = if the >>>>>> second task fails... >>>>>> >>>>>> Thanks again, >>>>>> >>>>>> Furcy >>>>>> >>>>>> >>>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk >>>>>> wrote: >>>>>> >>>>>> I think we've discussed several approaches like that and using Xcom >>>>>> name (which for many people would mean "let's just extend XCom table= for >>>>>> that" is not a very good idea to use it IMHO. I think this is very >>>>>> different functionality/logic which we might or might not agree to >>>>>> implement as a community. Naming it "Xcom" to trying to extend the X= Com >>>>>> table behavior might be problematic. >>>>>> >>>>>> Not sure if you are aware but we had very similar discussion about i= t >>>>>> recently (without clear conclusions but at least you can see what ki= nd of >>>>>> issues/problems different people have with this approach) >>>>>> >>>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a655= 5609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E >>>>>> >>>>>> I am not saying it is impossible to do, but I think it's a matter of >>>>>> how we formulate the "use case". It's very tempting to implement a g= eneric >>>>>> - intra-task communication mechanism, indeed. But it can very easily= lead >>>>>> to people abusing it and bypassing the guarantees (idempotency mainl= y) that >>>>>> Airflow provides for backfilling and re-running tasks. I thought a b= it >>>>>> after the latest discussion kind of died out, and I have one possibl= e >>>>>> solution to the problem. >>>>>> >>>>>> Let me explain what I think about it (but others can have different >>>>>> opinions of course): >>>>>> >>>>>> So far the discussion was that there are several ways to achieve wha= t >>>>>> you want (and it's really about what entity is providing the "idempo= tency" >>>>>> guarantee: >>>>>> >>>>>> 1) Similarly as just merged in the BigQuery Insert Job >>>>>> https://github.com/apache/airflow/pull/8868/files - you can provide >>>>>> job_id from outside. You'd need to work out the job_id naming that w= orks in >>>>>> your case and make sure that when you re-run your task with the same >>>>>> (dag_id, task_id, execution date) you will get the same id. Then the >>>>>> "uniqueness" thus idempotency is handled by the logic written in the= DAG. >>>>>> >>>>>> 2) Store the DAG id in some external storage (via one of the hooks - >>>>>> where it can be queried in the way that will work for you). Then the >>>>>> idempotency is actually handled by the logic in your Operator + some >>>>>> external storage. >>>>>> >>>>>> 3) Query your service and retrieve the JOB ID from it - but you have >>>>>> to have a way to query for the job related to your "dag id + task >>>>>> + execution_date". Then - the idempotency is actually handling by th= e >>>>>> Service you are using. >>>>>> >>>>>> In the use case, you describe - this is the only thing you need - >>>>>> "idempotency source". I believe you would like to get the case 2) fr= om >>>>>> above but without having to use external storage to store the "uniqu= e id". >>>>>> Something that will let each task in the same dag run to set or retr= ieve a >>>>>> unique value for that particular task. One value should be enough - >>>>>> assuming that each operator/task works on one external data "source"= . >>>>>> >>>>>> My current thinking is: >>>>>> >>>>>> Why don't we provide such a dedicated, idempotency service inside >>>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStor= age" >>>>>> class with two methods: >>>>>> >>>>>> * .set(id: str) and >>>>>> * .get() -> str >>>>>> >>>>>> And the data stored there should be a string keyed by "dag_id, >>>>>> task_id, execution_date)" - available also via Jinja templating. The= re is >>>>>> no intra-task communication, here, very little possibility of abuse = and it >>>>>> seems to solve the major pain point where you have to provide your o= wn >>>>>> storage to get the idempotency if your service does not provide one = or you >>>>>> do not want to delegate it to the DAG writer. >>>>>> >>>>>> J. >>>>>> >>>>>> >>>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin wrote= : >>>>>> >>>>>> The use case I'm referring to is that you can't use xcom to let a >>>>>> task read information from it's past attempts, because when a task s= tarts >>>>>> it's xcom is automatically deleted. >>>>>> >>>>>> My specific use case is that we have a custom LivyOperator that call= s >>>>>> Livy to start batch Spark Jobs. >>>>>> When you start a batch job Livy returns a job_id >>>>>> Sometimes our operator can fail for one reason or another (for >>>>>> instance if Livy is unreachable for a while) >>>>>> When the task retries, it calls Livy again, which start the same >>>>>> spark job, but the problem is that the spark job from the first atte= mpt can >>>>>> still be running, >>>>>> and then we have a batch job that runs twice simultaneously and >>>>>> creates duplicates in the output. >>>>>> >>>>>> What we tried to do is getting the job_id from the first try, to >>>>>> check if the job is still running, and wait for it to complete if it= is. >>>>>> >>>>>> We tried using xcom to let the task send a message to itself (to it'= s >>>>>> next try) but xcom is meant for "inter-task communication" only so t= his >>>>>> doesn't work and is not intended to work. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor >>>>>> wrote: >>>>>> >>>>>> Hi Furcy, >>>>>> >>>>>> Can you give a concrete example of what you mean by intra-task xcom? >>>>>> Depending your use case this may already be possible. >>>>>> >>>>>> On Jun 1 2020, at 11:45 am, Furcy Pin wrote: >>>>>> >>>>>> Hello, >>>>>> >>>>>> I would like to open a feature request for Airflow to support >>>>>> "intra-task xcom". >>>>>> >>>>>> It seems that there are several distinct use cases for it already >>>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket= . >>>>>> >>>>>> I wanted to summarize links to the use cases and past attempts, >>>>>> and the recommended approach (which apparently would be to create >>>>>> a distinct feature from xcom to support this, it could be calle >>>>>> intra-com or self-com ?) >>>>>> >>>>>> Do you know if such ticket already exists? I couldn't find one. >>>>>> Also I can't create any ticket due to some obscure bug (see my other >>>>>> email). >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Furcy >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Jarek Potiuk >>>>>> Polidea | Principal Software Engineer >>>>>> >>>>>> M: +48 660 796 129 <+48660796129> >>>>>> [image: Polidea] >>>>>> >>>>>> >>> >>> -- >>> >>> Jarek Potiuk >>> Polidea | Principal Software Engineer >>> >>> M: +48 660 796 129 <+48660796129> >>> [image: Polidea] >>> >>> --=20 Jarek Potiuk Polidea | Principal Software Engineer M: +48 660 796 129 <+48660796129> [image: Polidea] --000000000000891f0e05ab16ebeb Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
I'd love to take part - but I am going for week of vac= ations (finally!) next week, so week after next would be great!

On Thu, Jul = 23, 2020 at 12:23 AM Daniel Standish <dpstandish@gmail.com> wrote:
We are using state persistence p= retty heavily right now with plugin models that I have called ProcessState = and TaskState

Our implementation might be too idiosyncra= tic to contribute to airflow, but then again it might not.=C2=A0 I would be= happy to do a call to demo what we are doing to see if there is any intere= st,=C2=A0and to receive guidance from interested parties re what if anythin= g might make sense in airflow.=C2=A0 Kaxil do you have any interest in that= ?

On Thu, Jun 4, 2020 at 4:49 PM Kaxil Naik <kaxilnaik@gmail.com> wrote:
I definitely feel we can support this uses-cases by improving XCom. The co= ncept of XCom was to allow sharing messages & state between tasks.

Here is the first line from the docs about Xcom:
=

XComs let tasks exchange messages, allowing mo= re nuanced forms of control and shared state. The name is an abbreviation o= f =E2=80=9Ccross-communication=E2=80=9D.=C2=A0

I read the AIP (https://cwiki.ap= ache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence)=C2=A0f= rom=C2=A0@Daniel Standish=C2=A0, the "namespacing"= ; of this state would be a good feature,=C2=A0
XCom already allow= s that with "dag_id, task_id" key. Solution 1 in the AIP w= ould solve the issue without much impact and whilst maintaining=C2=A0backwa= rds-compatiblilty.

I am against the idea of using = Secrets Backend for storing "State". Storing state for some kind = of persistence should be "short-lived" and temporary.=C2=A0
=

The "writers" & "readers" of bo= th (Secrets & State) are different. Generally, Sysadmins / Teamleads ar= e responsible for managing secrets (writing, rotating, auditing) etc wherea= s for State it is written via Airflow Workers=C2=A0 and would (or should) b= e short-lived and you don't care by auditing or rotating the value in &= quot;State".

The only problem that I can see = in the current XCom implementation is 1) the use of execution_date and the = fact that 2) XCom are cleared at the start.=C2=A0

= One of the issue we already want to address in Airflow is to remove the har= d-requirement of "execution_date" for DagRun and TaskInstance. Th= is would also help in fixing (1) above.

(2) can be= solved by a flag as mentioned in the AIP.

Regards= ,
Kaxil


=
On Tue, Jun 2, 2020 at 8:05 AM Jarek = Potiuk <Ja= rek.Potiuk@polidea.com> wrote:
I think this subject came so often, = that I also change my mind slowly in favor of making an explicit state pers= istence "service".=C2=A0

Whether it's only= one key or more, it's secondary, I think but if users are already usin= g Variables to keep state for tasks - this is a clear sign that we miss a c= rucial feature and our users are abusing Airflow already in the way we try = to prevent by not introducing "State service".=C2=A0

With the recent SecretBackend implementation where Variables might be kept= in a Secret backend - not only MetaStore - potentially you might have no w= rite access to the backend. There is even no "write" support in t= he current "MetastoreBackend" implementation for writing variable= s. So we already have configurations where if we try to write variables and= read it elsewhere might not work - as far as I can see. You can set severa= l backends of=C2=A0course and the Metastore as the last fallback of course,= =C2=A0but for me, it=C2=A0opens up different problems - what happens if the= key is present in both, tasks=C2=A0writes it to metastore, but another tas= k reads it from the Secret Backend.

I think it see= ms that variables are being abused in exactly the way we want to prevent th= e "StateService" to be abused - and shying away from that is real= ly like closing our eyes and pretending it's not happening.=C2=A0

So maybe we can make a change AIP with this=C2= =A0approach:=C2=A0

1) Variables -> mostly read-= only (for tasks)=C2=A0 and used to keep configuration shared between worker= s (but not on a task level).
2) StateService (or wherever w= e call it) where we keep state information for specific dag + task=C2=A0+ e= xecution_date.

J.

=

= On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <dpstandish@gmail.com> wrote:
<= /div>
Air= flow already provides a mechanism for state persistence: the Variable, and,= with caveats and flaws, XCom.

I personally persist stat= e to the airflow metastore database for a large percentage of our jobs.=C2= =A0 They are incremental jobs and it is helpful to keep track of watermark.=

I think that incremental jobs are probably very v= ery common=C2=A0in airflow implementations.=C2=A0 Though probably often tim= es users resort to imperfect vehicles for this such as `execution_date` or = xcom.

I have a very draftey=C2=A0draft aip that i = haven't had enough time to work on, which explores adding explicit supp= ort for state persistence to airflow:=C2=A0https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+pers= istence.=C2=A0 Though I understand it is a controversial idea.=C2=A0 (n= ote: The AIP is not ready for primetime.)

I am of = the mind that being able to persist some state is not a fundamental change = to airflow and would just add explicit (and more user-friendly) support for= something that is already quite common, and fits fully within the wheelhou= se of what airflow exists to do.




On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <chris@crpalmer.com>= wrote:
Furcy,

To clarify, when I say that Airflow should not be in= the business of keeping=C2=A0state about external systems, I specifically = mean it shouldn't be keeping state to be shared between task instances.= I completely understand that there may be external systems that are harder= to work with, and like in your case require the operator to be able to sto= re some piece of information to make them idempotent. I just don't thin= k that Airflow should provide that storage mechanism.

I = would think that most users of Airflow have access to some sort of cloud st= orage like S3 (which are really just key-value stores), and it's easy e= nough to write your job_id or whatever=C2=A0value you care about to a file = with a prefix computed from the dag_id, task_id, execution_date or whatever= combination of them you care about. Yes it makes your operators more compl= ex and they have to know about another system, but it keeps that complexity= out of core Airflow. That's the trade off.

Ash,=C2=A0
I'= m not suggesting that XCom be removed from Airflow, and I understand there = are use cases where it makes some things convenient. In your example though= , it would be just as easy for the sensor to write the found object path as= the contents of another file in S3, with a computable prefix based on the = dag/task/execution_date.


At its heart XCom is just a key-value s= tore where the keys are limited to a very specific set of possibilities, an= d where key-value pairs are managed in some specific ways. The request here= is to add another narrowly defined set of allowable keys, and as far as I = can tell with no extra management of them. The only real advantage of using= the Airflow database for XCom or any expansion/variation on it is that we = know that all operators have access to the database.

I'm not an = expert but I would wonder how well Postgres or MySQL perform as high volume= key value stores.=C2=A0Does anyone actually use XCom at scale, and does th= at extra load on the database impact scheduling and other performance aspec= ts of Airflow?

Chris


On Mon, Jun 1, 2= 020 at 5:03 PM Ash Berlin-Taylor <ash@apache.org> wrote:
Just to touch on one point about XCom, and= to re-assure people that they, or something very like them are in Airflow = for the foreseeable future.

As an example of an appropriate u= se for XCom:=C2=A0Let's say a third party del= ivers you a set of files once a week, but the exact name of the files isn&#= 39;t known (by you) in advance. So you write a sensor that polls/checks S3 = for the Objects to appear in our bucket, and the sensor outputs the S3 Obje= ct path to XCom, that then next processing step then examines to process th= e files.

That sort of use case is not going anywhere.
Cheers,
-ash

On Jun 1 2020, at 7:37 pm, Chri= s Palmer <chris@= crpalmer.com> wrote:
At the risk of repea= ting myself (from the previous thread that touched on this topic), I don= 9;t think Airflow should be in the business of keeping state about external= systems. Airflow is about authoring and running workflows; it's not a = messaging tool or a cluster management=C2=A0tool. I'm not convinced tha= t the existing XCom functionality should really be a part of Airflow, and I= certainly don't think it should be expanded upon or new variations add= ed. I think storing state is especially risky,=C2=A0if for no other reason = than the fact that Airflow is not the source of truth about those systems. = It's very likely that at some times the "state" that Airflow = has saved will diverge from the actual state of the external system. Handli= ng that nicely, probably requires a bunch of custom code in the operators/h= ooks anyway, so I don't think it saves anything in terms of operator co= de complexity. Users would be much better served going to the source of tru= th to determine state. If part of the problem is that Livy is lacking in fe= atures (like being able to query the status of a particular job_id) then I = think it would be more appropriate to add the needed features to that proje= ct. Airflow at its core shouldn't be concerned with making up for failu= res of other tools.

Also as can be seen by just this discussi= on, it's hard to keep these extra features from expanding in scope. Jar= ek proposed something that would just store a single string, and immediatel= y Furcy wants to expand it to store multiple strings. Either way we are rea= lly just talking about a key-value store, and putting limits on how that ke= y can be structured; the key is made up of some predefined set of Airflow e= ntities (for Jarek's proposal) or some arbitrary key along with those A= irflow entities=C2=A0(Furcy's proposal).

I know in the pa= st that I had a situation where I wanted to reuse a cluster across multiple= data intervals, if one was already running (this was before I discovered A= irflow so wasn't "execution dates" precisely). I can equally = see use cases where I might want to share some resource for multiple tasks = in a DAG, or across similar tasks in multiple DAGs. So if we added this the= n why limit it to any one of those combinations? But then we just have an a= rbitrary key-value store. If you want to use Airflow for that then you can = use Variables, if you want to use something else then you can.

Unless Airflow is doing some extra management=C2=A0of these key-values in= some way (like it does with clearing out XCom's on reruns), then I see= absolutely no added benefit. And even with some potential management by Ai= rflow I'm still not convinced that Airflow is the right place for it.

Chris

On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <pin.furcy@gmail.com> wrote:
=
Thank you Jarek for the detailed explanation,

That&= #39;s exactly what I wanted to do: write a feature request to summarize all= those discussions.
I agree with you that the feature should be m= arked distinct from the XCom feature and that we should not piggyback this = feature into XCom.

The crux of the problem, I think is that w= ith XCom you do want the task to delete it's xcom on the beginning of t= he retry.
Correct me if I'm wrong but one use cases where it = was necessary was having=C2=A0a task A and a task B that starts immediately= after A, and wait from some 'signal' from A.
If A and B = restart and A doesn't reset it's signal, then B will use the signal= from A's first try, which is incorrect.

About the 3 solu= tions you mention:

1) Providing the job_id from outside. That= works indeed. Sadly in my use-case Livy's API is poorly designed and o= nly returns a generated job_id, you can't specify a custom one.
You can't even find a job by name, I would have to list all the acti= ve job_ids, and do a GET for each of them to get it's name and find whi= ch one is the one I want. It's doable but inelegant.

2) S= tore the id in an external storage. Of course it would work but it requires= an external storage. More on that below.

3) I'm not sure= I understand completely what you mean there, but I think you mean that the= idempotency can be handled by the service you call (for instance BigQuery)= . Indeed that is another solution. If we were using Spark with a Hive metas= tore=C2=A0+ locking or the deltalake storage format, we could have somethin= g to prevent a job that run twice from creating duplicates. This is another= solution we are considering, but it is coslty=C2=A0to change now.
You guess correctly that the=C2=A0feature=C2=A0I was=C2=A0asking=C2= =A0for me would be to provide some utility to let the users implement solut= ion 2) without requiring an external storage.
I think it would be= a QOL improvement for some use cases, just like it could be argued that XC= om is just a QOL improvement and users could have used an external storage = themselves.
The main advantage that it brings is making the custo= m operators much easier to share and reuse across the Apache Airflow commun= ity, compared to having to set up some external
storage.=C2=A0
I have seen that some users used the metadata store itself as a= n external storage by adding a new table to the airflow model:=C2=A0
<= div>http://mail-archives.apache.org/mod_mbox/airflow-dev/20= 1809.mbox/%3cCAERDx9eKTwU5Urq+pnq_8Q-hb-nHtFNq_xwkGGpxVo4MhB_Brg@mail.gmail= .com%3e

And others suggested using XCom itself as an exte= rnal storage by storing information with a special task_id: https://stackoverflow.com/a/5751514= 3/2087478

In the discussion thread you provided it was al= so suggested to use Variables to store some persisting information.
These 3 approaches work but feel quite "hacky" and I belie= ve that providing such functionality would be good.

Finally, = I don't see the point of limiting the functionality to such extent, pro= viding a "IdempotencyIdStorage" that only allows you to store a s= tring
will just force people who need to store more than one id f= or one task (for whatever reason) to use some hack again, like storing a js= on inside the storage.

I was more thinking about something qu= ite similar to XCom (I liked the XState name suggestion), where the entry w= ould be keyed by "(dag_id, task_id, execution_date, key)"
where "key" can be whatever you want and would be kept across = retries.

I have read (quickly) through the "Pandora'= s Box" thread you linked. Indeed it looks like there would be many way= s to misuse such feature.
I do understand the important of idempo= tency, and it looks like my use case is one of the first ever listed where = I do need to persist a state
across retries to make my operator r= eally idempotent.

I'm surprised no one came up with it gi= ven how frequent the Spark=C2=A0+ Airflow combination is (well, the BigQuer= yOperator was one too but found another solution).

Of course = we can blame it on Livy for being poorly conceived (unlike BigQuery) or we = can blame it on Spark for not having a built-in security mechanism to preve= nt double-writes,
but I think that as the above hacks show, you c= an't really prevent users from shooting themselves in the foot if that&= #39;s what they really want to.

While I do think that making = things foolproof is important, I believe it's also in Python's phil= osophy to not=C2=A0make=C2=A0things foolproof at the detri= ment of simplicity for the right use cases.
But I do understand t= hat the use cases are different and contradictory: some would require the s= tate to be persisted across reschedule and not retries, mine would require = the state to be persisted across retries and not reschedule.

= Maybe the Airflow-y way for that would be to have one task that does the su= bmit and an xcom with the job, then one task that check the progress of the= job, but that feels very cumbersome to double the number of tasks just for= that. Plus I'm not sure we could make the first task retry if the seco= nd task fails...

Thanks again,

Furcy

<= /div>
On Mon, = 1 Jun 2020 at 16:01, Jarek Potiuk <Jarek.Po= tiuk@polidea.com> wrote:
I think we= 've discussed several approaches like that and using Xcom name=C2=A0(wh= ich for many people would mean "let's just extend XCom table for t= hat" is not a very good idea to use it=C2=A0IMHO. I think this is very= different functionality/logic which we might or might not agree to impleme= nt=C2=A0as a community. Naming it "Xcom" to trying to extend the = XCom table behavior might be problematic.

Not sure if yo= u are aware but we had very similar discussion about it recently (without c= lear conclusions but at least you can see what kind of issues/problems diff= erent people have with this approach)=C2=A0

I am not saying it= is impossible to do, but I think it's a matter of how we formulate the= "use case". It's very tempting to implement a generic - intr= a-task communication mechanism, indeed. But it can very easily lead to peop= le abusing it and bypassing the guarantees (idempotency mainly) that Airflo= w provides for backfilling and re-running tasks. I thought a bit after the = latest discussion kind of died out, and I have one possible solution to the= problem.

Let me explain what I think about it (bu= t others can have different opinions of course):

<= div>So far the discussion was that there are several ways to achieve what y= ou want (and it's really about what entity is providing the "idemp= otency" guarantee:

1) Similarly as= just merged in the BigQuery Insert Job=C2=A0https://github.com/apache/airflow/pull/8= 868/files=C2=A0- you can provide job_id from outside. You'd need to= work out the job_id naming that works in your case and make sure that when= you re-run your task with the same (dag_id, task_id, execution date) you= =C2=A0will get the same id. Then the "uniqueness" thus idempotenc= y is handled by the logic written in the DAG.

2) S= tore the DAG id in some external storage (via one of the hooks - where it c= an be queried=C2=A0in the way that will work for you). Then the idempotency= is actually handled by the logic in your Operator + some external storage.=

3) Query your service and retrieve the JOB ID fro= m it - but you have to have a way to query for the job related to your &quo= t;dag id=C2=A0 + task +=C2=A0execution_date". Then - the idempotency i= s actually handling by the Service you are using.

= In the use=C2=A0case, you describe - this is the only thing you need - &quo= t;idempotency source". I believe you would like to get the case 2) fro= m above but without having to use external storage to store the "uniqu= e id".=C2=A0 Something that will let each task in the same dag run to = set or retrieve a unique value for that particular=C2=A0task. One value sho= uld be enough - assuming that each operator/task works on one external data= "source".=C2=A0

My current thinking is:=

Why don't we provide such a dedicated, idempo= tency service inside Airflow? We already have a DB and we could have an&quo= t;IdempotencyIdStorage" class with two methods:=C2=A0

=C2=A0 * .set(id: str) and=C2=A0
=C2=A0 * .get() -> s= tr

And the data stored there should be a string ke= yed by "dag_id, task_id, execution_date)" - available also via Ji= nja templating. There is no intra-task communication, here, very little pos= sibility of abuse and it seems to solve the major pain point where you have= to provide your own storage to get the idempotency if your service does no= t provide one or you do not want to delegate it to the DAG writer.

J.


On Mon, Jun 1, 2020 at 2:12 PM= Furcy Pin <pin.furcy@gmail.com> wrote:
<= /div>
The use case I'm referring=C2=A0to is that y= ou can't use xcom to let a task read information from it's past att= empts, because when a task starts it's xcom is automatically deleted.
My specific use case is that we have a custom LivyOperator tha= t calls Livy to start batch Spark Jobs.
When you start a batch jo= b Livy returns a job_id
Sometimes our operator can fail for one r= eason or another (for instance if Livy is unreachable for a while)
When the task retries, it calls Livy again, which start the same spark jo= b, but the problem is that the spark job from the first attempt can still b= e running,
and then we have a batch job that runs twice simultane= ously and creates duplicates in the output.

What we tried=C2= =A0to do is getting the job_id from the first try, to check if the job is s= till running, and wait for it to complete if it is.

We tried = using xcom to let the task send a message to itself (to it's next try) = but xcom is meant for "inter-task communication" only so this doe= sn't work and is not intended to work.





<= div class=3D"gmail_quote">
On Mon, 1 Jun 2020= at 13:15, Ash Berlin-Taylor <ash@apache.org> wrote:
Hi Furcy,

Can you give a concrete ex= ample of what you mean by intra-task xcom? Depending your use case this may= already be possible.

On Jun 1 2020, at 11:45 am, Furcy Pin &= lt;pin.furcy@gmail.com> wrote:
<= div>
Hello,

I would like to open a feature request for Ai= rflow to support "intra-task xcom".

It seems that t= here are several distinct use cases for it already
and only ugly = workarounds and I wanted to list them in a JIRA ticket.

I wan= ted to summarize links to the use cases and past attempts,
and th= e recommended approach (which apparently would be to create
a dis= tinct feature from xcom to support this, it could be calle intra-com or sel= f-com ?)

Do you know if such ticket already exists? I couldn&= #39;t find one.
Also I can't create any ticket due to some ob= scure bug (see my other email).

Thanks,

Furcy


<= /div>
--

Jarek Potiuk
Polidea=C2=A0| Princi= pal Software Engineer



--
<= div dir=3D"ltr">

Jarek Potiuk
Polidea=C2=A0| Principal So= ftware Engineer

M:=C2=A0+48 660 796 129
3D"Polidea"




--

Jarek Potiuk<= /span>
Polide= a=C2=A0| Principal Software Engineer

M:=C2=A0+48 660 796 129
3D"Polidea"=


=
--000000000000891f0e05ab16ebeb--