airflow-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabor Hermann <m...@gaborhermann.com>
Subject Re: DagRun unexpectedly stops without error
Date Thu, 07 May 2020 15:36:15 GMT
Hi Ash,

Thanks for the quick reply.

While trying to copy-paste the DAG here I found the problem. I 
accidentally set the dagrun timeout to too low (timeout was 4 hours 
while a normal run took around 6 hours). So it was a silly mistake from 
my side and the problem is solved by increasing the dagrun timeout to 16 
hours.

I would have seen the DagRuns as failed, but for older DagRuns were 
marked as success because I had a special operator that was backfilling 
tasks (not full DAG) in previous days to make sure that older 
intermediate data is available.

For future reference, here's a more minimal example DAG:

import airflow
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils import timezone
from airflow.models import DAG

from datetime import timedelta
import time

operator_default_args = dict(
     owner="Gabor Hermann",
     start_date=timezone.datetime(2020, 5, 3),
)

with DAG(
         dag_id="test_timeout",
         schedule_interval="@daily",
         dagrun_timeout=timedelta(minutes=4),
         start_date=timezone.datetime(2020, 5, 3),
         end_date=timezone.datetime(2020, 5, 5),
) as dag:
     daily_incremental = DummyOperator(task_id="daily_incremental")
     check_daily_incrementals = BashOperator(
         task_id="check_daily_incrementals",
         bash_command="""
             airflow backfill test_timeout -s {{ macros.ds_add(ds, -3) 
}} -e {{ ds }} --ignore_dependencies --rerun_failed_tasks -t 
^daily_incremental$
         """
     )
     aggregate1 = PythonOperator(task_id="aggregate1", 
python_callable=lambda: time.sleep(90))
     aggregate2 = PythonOperator(task_id="aggregate2", 
python_callable=lambda: time.sleep(90))
     aggregate3 = PythonOperator(task_id="aggregate3", 
python_callable=lambda: time.sleep(90))

     daily_incremental >> aggregate1
     check_daily_incrementals >> aggregate1 >> aggregate2 >> aggregate3


In this example we would like to make sure that the last 3 days of 
`daily_incremental` is completed before we start aggregating them 
(`aggregate1`). For this purpose I used a BashOperator that executed an 
Airflow backfill command specifically backfilling the task 
`daily_incremental`.

The DAG always times out, we never get to `aggregate3` task. But only 
the latest DagRun is in state `failed`, previous ones are in `success` 
even though they did not get to run `aggregate3` because of timeout. 
(See visually here: https://i.imgur.com/ll0Rg19.png).


A separate question: is there a typical way to solve this "daily 
incremental" processing?

I can think of other ways, e.g. using `depends_on_past=True` and 
handling catch up with a sensor that checks if it's the latest dagrun 
and only runs aggregates for the latest dagrun.

Cheers,
Gabor

On 5/7/20 1:13 PM, Ash Berlin-Taylor wrote:

> That does sound very odd, I've not heard of that happening before.
>
> Are you able to share your DAG file (you can remove any queries etc) - 
> it may help us debug it.
>
> Thanks,
> Ash
>
> On May 7 2020, at 12:11 pm, Gabor Hermann <mail@gaborhermann.com> wrote:
>
>     Hello fellow Airflowers, I'm relatively new to Airflow and I'm
>     really grateful as it already saved us some pain in production. So
>     thanks for all the work! 🙏 Now I'm trying to set up DAG with
>     around 20-30 tasks (BigQuery queries, Pyspark Dataproc jobs) and
>     I've seen a weird behavior where a DAG run stops running, the DAG
>     is marked as success but some tasks are clear. The annoying is
>     that there's not even a sign of failure. Do you know why this
>     might be happening? I couldn't find a related issue on GitHub. One
>     thing I'm suspecting is DAG importing timing out, could that cause
>     such behavior? (I'm using version 1.10.3.) Thanks in advance for
>     any pointers. Cheers, Gabor
>

Mime
View raw message