airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <>
Subject [GitHub] [airflow] ashb commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
Date Thu, 16 Jan 2020 12:16:37 GMT
ashb commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to
external task sensor

 File path: airflow/sensors/
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 from sqlalchemy import func
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+    return dttm if isinstance(dttm, list) else [dttm]
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 Review comment:
   No, looking up the dag in the Dag bag won't work -- the whole goal of enabling serialization
is to not need to run the any code in the dag files (or ideally even not need access to the
files at all!) from the webserver.
   To support when the dag serialization is not enabled/not yet the only option you could
do `getattr(operator, 'execution_date_fn')` -- or the other option might be to have the operator
store the result of the FN in xcom, and then have the link look up the result from XCom?

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

View raw message