airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sabeer Zaman (JIRA)" <j...@apache.org>
Subject [jira] [Created] (AIRFLOW-104) State of `ExternalTaskSensor` task when the external task is marked as "failure"
Date Wed, 11 May 2016 12:55:12 GMT
Sabeer Zaman created AIRFLOW-104:
------------------------------------

             Summary: State of `ExternalTaskSensor` task when the external task is marked
as "failure"
                 Key: AIRFLOW-104
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-104
             Project: Apache Airflow
          Issue Type: Improvement
            Reporter: Sabeer Zaman
            Priority: Minor


Dear Airflow Maintainers,

Before I tell you about my issue, let me describe my environment:

h3. Environment

* Version of Airflow: v1.6.2
* Airflow components and configuration: Running with CeleryExecutor (separate docker containers
running webserver, worker, rabbitmq and mysql db)
* Operating System: {{Darwin Kernel Version 15.3.0: Thu Dec 10 18:40:58 PST 2015; root:xnu-3248.30.4~1/RELEASE_X86_64
x86_64}}
* Python Version: 2.7.6

Now that you know a little about me, let me tell you about the issue I am having:

h3. Description of Issue

I created two DAGs - let's call them {{dag_a}} and {{dag_b}}. One of the tasks in {{dag_b}}
is an {{ExternalTaskSensor}} referencing a task with {{task_id="external_task"}} in {{dag_a}}.
So the code looked as shown below:

{code}
# in DAG definition for "dag_a"
# ... imports, boilerplate setup - e.g., defining `default_args`
dag = DAG(dag_id="dag_a", default_args=default_args, schedule_interval="0 0 * * *",)
external_task = DummyOperator(
  task_id="external_task",
  dag=dag,
)
{code}
{code}
# in DAG definition for "dag_b"
# ... imports, boilerplate setup - e.g., defining `default_args`
dag = DAG(dag_id="dag_b", default_args=default_args, schedule_interval="0 0 * * *",)
task_sensor = ExternalTaskSensor(
  task_id="dag_a.external_task",
  external_dag_id="dag_a",
  external_task_id="external_task",
  dag=dag,
)
{code}

To test failure behavior, I marked the task with {{task_id="external_task"}} in {{dag_a}}
as "failed" (for a particular execution date). I then ran the backfill for the _same execution
date_ for {{dag_b}}.

* What did you expect to happen?
** I expected the task named {{"dag_a.external_task"}} in {{dag_b}} to be marked either as
{{failed}} or {{upstream_failed}}, since the actual task it was referencing in {{dag_a}} failed.
* What happened instead?
** The log for the task {{"dag_a.external_task"}} in {{dag_b}} showed that it kept poking
{{external_task}} in {{dag_a}} every minute

h3. Requested Change 

Looking at the logic in the [{{poke}} function for the {{ExternalTaskSensor}}|https://github.com/airbnb/airflow/blob/1.7.0/airflow/operators/sensors.py#L178-L200],
it's evident that it's acting as a regular Airflow Sensor and just waiting until something
becomes true, and is in no way coupling the state of the current task with the state of the
external task.

That being said, is it reasonable to request such behavior (i.e., the {{ExternalTaskSensor}}'s
state is set to failed if the task it's waiting on is marked as {{failed}})? I'd be willing
to take a stab at adding the logic, but I'd like to make sure that this is in line with how
this Sensor's intended behavior, or if there's a suggested alternative way of achieving this
behavior.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message