airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [airflow] rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
Date Tue, 14 Jan 2020 17:02:00 GMT
rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link
to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r366459866
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+
+    return dttm if isinstance(dttm, list) else [dttm]
+
+
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 
 Review comment:
   That's correct. I don't think we're able to serialize / deserialize functions, so that
would be incompatible with Airflow 2.0

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message