airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <>
Subject [GitHub] [airflow] mik-laj commented on a change in pull request #6210: [AIRFLOW-5567] [Do not Merge] prototype BaseAsyncOperator
Date Sat, 28 Sep 2019 23:49:30 GMT
mik-laj commented on a change in pull request #6210: [AIRFLOW-5567] [Do not Merge] prototype

 File path: airflow/models/
 @@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+from functools import wraps
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.exceptions import AirflowException
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+class BaseAsyncOperator(BaseSensorOperator, SkipMixin):
+    """
+    AsyncOperators are derived from this class and inherit these attributes.
+    AsyncOperators must define a `submit_request` to fire a request for a
+    long running operation with a method and then executes a `poke` method
+    executing at a time interval and succeed when a criteria is met and fail
+    if and when they time out. They are effctively an opinionated way use
+    combine an Operator and a Sensor in order to kick off a long running
+    process without blocking a worker slot while waiting for the long running
+    process to complete by leveraging reschedule mode.
+    :param soft_fail: Set to true to mark the task as SKIPPED on failure
+    :type soft_fail: bool
+    :param poke_interval: Time in seconds that the job should wait in
+        between each tries
+    :type poke_interval: int
+    :param timeout: Time, in seconds before the task times out and fails.
+    :type timeout: int
+    :type mode: str
+    """
+    ui_color = '#9933ff'  # type: str
+    valid_modes = ['poke', 'reschedule']  # type: Iterable[str]
+    @apply_defaults
+    def __init__(self,
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(mode='reschedule', *args, **kwargs)
+    def submit_request(self, context) -> string:
 Review comment:
       def submit_request(self, context) -> IT:
   I imagine that the identifier will be a different type, e.g. a number, or an array with
several identifiers. What do you think about using [generic types](

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

View raw message