airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniel Standish <dpstand...@gmail.com>
Subject Re: Tasks that run just once
Date Thu, 16 May 2019 06:04:24 GMT
>
> However, I also want to create the tables themselves via Airflow and run a
> one time backfill for each table. Backfilling hourly is proving to be a
> very inefficient process.
>

So, one thing, I have implemented alternative "initial load" behavior
inside the execute method of an operator by checking if there is a previous
task instance. E.g.:

try:
    prev_ti = getattr(context.get('ti'), 'previous_ti')
except AttributeError:
    prev_ti = None
if prev_ti is None:
    self.initial_load = True

I don't know what happens when there are prior dag runs and it's just a
task that has never been executed (as opposed to a brand new dag run).  I
you can see the empty boxes but I think that's just for looks and there are
not really any TIs behind it, probably still works.
But if not, at a minimum you could use xcoms to log and detect with
certainty if something has been run.
Perhaps you can define in your operator an alternate execute method that
will "backfill" your customer / source combo in a reasonable way on the
first run.  I.e. create the tables, and load with the full shebang or batch
it appropriately.








On Mon, May 13, 2019 at 10:54 AM Chris Palmer <chris@crpalmer.com> wrote:

> I'm trying to design a set of DAGs to do a one create and backfill of a set
> of tables in BigQuery and then perform periodic loads into those tables. I
> can't quite get it to work the way I want to and I'm wondering if other
> people have solved similar problems.
>
> The parameters are as follows:
>
> I have a list of customers: cust1, cust2, etc
> I have a list of sources: source1, source2, etc
> For each pairing of customer and source I want a task that runs hourly to
> load new data into a unique table for that pair (loads are done via a
> single BigQuery query).
>
> So far that's easy to solve: I have a single DAG and just loop through the
> two lists creating a task for each pairing.
>
> However, I also want to create the tables themselves via Airflow and run a
> one time backfill for each table. Backfilling hourly is proving to be a
> very inefficient process.
>
> Finally, I also want to be able to add new customers and/or new sources to
> the lists and have it work. I know I could achieve this with one or more
> DAGs per customer/source pair, but I had hoped to avoid that explosion in
> the number of DAGs.
>
> The closest I can get is to have two DAGs. The first has a schedule of
> '@once' and for each customer/source pair has a CreateTable task and a
> downstream BackfillTable task. The second DAG runs '@hourly' and just has a
> LoadTable task for each customer/source pair.
>
> This works fine for the initial lists, but once the first DAG has run once,
> it's DagRun is marked as 'success'. If I then add a new customer or source,
> then the new tasks get in that first task, but the DagRun is never checked
> again. If I manually switch the DagRun back to 'running' then it picks up
> the new tasks.
>
> Is there some other setup that I'm missing that get's me what I want?
>
> Thanks
> Chris
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message