airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ash Berlin-Taylor <ash_airflowl...@firemirror.com>
Subject Re: too many long running tasks in sensors
Date Wed, 18 Oct 2017 13:20:22 GMT
This isn't possible with Sensors as they exist right now. Our solution was to not make it a
sensor, but just a normal operator, and set a number of retries and a large retry_delay:

checkTask = FindTriggerFileForExecutionPeriod(
    task_id="trigger_file_for_period",
    s3_conn_id="s3_default",
    prefix=source_s3_uri,
    dag=dag,

    # Try every 6 hours for 3 days, just in case the trigger is delayed.
    retries=12,
    retry_delay=timedelta(hours=6)
)

When it fails it shows up as "Up for retry" (Yellow) in the scheduler but doesn't take up
a slot.

The operator is defined as:

class FindTriggerFileForExecutionPeriod(BaseOperator):
    """
    Find any trigger file for the execution period.

    In the unlikely event that there are multiple trigger files the most recent
    (by date in filename, not mtime) will be returned.
    """
    trigger_file_wildcard = 'xxx*_*.gz.trigger'

    @apply_defaults
    def __init__(self, s3_conn_id, prefix, **kwargs):
        self.s3_conn_id = s3_conn_id
        self.prefix = prefix
        super().__init__(**kwargs)

    def execute(self, context):
        hook = S3Hook(self.s3_conn_id)
        (bucket_name, prefix) = hook.parse_s3_url(self.prefix)
        min_date = context['execution_date']
        max_date = context['next_execution_date']

        for key in sorted(hook.list_keys(bucket_name, prefix), reverse=True):
            if not fnmatch.fnmatch(basename(key), self.trigger_file_wildcard):
                continue
            when = _tapad_key_content_date(key)

            # We have sorted the keys, so if we get past the range we are
            # interested in we're not going to find anything.
            if when < min_date:
                break

            if when < max_date:
                self.xcom_push(context, "content_date", _generate_iso_prefix(when))
                return "s3://{}/{}".format(bucket_name, key)

        raise RuntimeError("No trigger file found with filename between in interval ({start},
{end}]".format(
            start=min_date,
            end=max_date,
        ))






> On 18 Oct 2017, at 13:58, Cieplucha, Michal <michal.cieplucha@intel.com> wrote:
> 
> Hello,
> 
> We are using sensors for lightweight but taking long time tasks. It's like monitoring
results of some testing executed on remote machine. We would like to see such a task in a
DAG but with so much tasks/dags in running state we will hit max number of processes/dag runs
(defined in airflow.cfg) soon. Is it possible to have a sensor, which instead of sleep and
keep the process would just exit and be rescheduled later? Are we using sensors in wrong manner?
> 
> Thanks
> mC
> 
> ---------------------------------------------------------------------------------------------------------------------------------
> I am an Intel employee. All comments and opinions are my own and do not represent the
views of Intel.
> --------------------------------------------------------------------
> 
> Intel Technology Poland sp. z o.o.
> ul. Slowackiego 173 | 80-298 Gdansk | Sad Rejonowy Gdansk Polnoc | VII Wydzial Gospodarczy
Krajowego Rejestru Sadowego - KRS 101882 | NIP 957-07-52-316 | Kapital zakladowy 200.000 PLN.
> 
> Ta wiadomosc wraz z zalacznikami jest przeznaczona dla okreslonego adresata i moze zawierac
informacje poufne. W razie przypadkowego otrzymania tej wiadomosci, prosimy o powiadomienie
nadawcy oraz trwale jej usuniecie; jakiekolwiek
> przegladanie lub rozpowszechnianie jest zabronione.
> This e-mail and any attachments may contain confidential material for the sole use of
the intended recipient(s). If you are not the intended recipient, please contact the sender
and delete all copies; any review or distribution by
> others is strictly prohibited.
> 


Mime
View raw message