airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michal K <postratio...@yahoo.com.INVALID>
Subject Suspend and resume Sensors
Date Mon, 17 Oct 2016 10:24:45 GMT
Hello Everyone,
our company is currently considering Airflow for the engine of a pretty big workflow system.We
use Python 3 and have some HA requirements, so we plan to use Celery with multiple worker
and scheduler nodes and RabbitMQ as message bus. We are also using Docker containers for deployment.
We have a specific case, which we need to handle in our workflows. Some tasks, which we trigger
are very long lived (many hours long). These long tasks are carried out by external systems,
independent of the Airflow nodes. In our workflow need to start, then monitor these external
tasks and trigger downstream operators based on their results.
This seems like a perfect job for Airflow sensors. Here however, we run into a small problem. In
our testing it seems not possible to stop an Airflow worker while a sensor is running. The
sensor continues to run during a Celery warm shutdown and it's background process remains
active even after a cold shutdown. This prevents us from being able to safely destroy the
worker's Docker container.
Since at least one of the external tasks we need to monitor is practically always running,
we would not be able to stop our Airflow workers in order to deploy code changes to our DAGs
or the libraries they depend on.
It would be perfect for our case to have the ability to suspend an Airflow sensor while shutting
down the worker and resume it when the worker restarts.
I was wondering how this could be implemented in Airflow and I came up with this initial idea:https://github.com/postrational/incubator-airflow/commit/74ac6f7290d6838362ff437e228465bb49fe198f
The code adds a signal handler to the BaseSensorOperator, which raises an exception if it
detects that the worker is shutting down (SIGINT detected).
Later on it handles the exception in the 'run' method of the TaskInstance. The idea is to
put the sensor in a state which would cause it to be cleanly resumed after Airflow comes back
up after a restart.
So far, my draft code works some of the time, but not always. Sometimes the sensor resumes
correctly, but sometimes it doesn't trigger its downstream operator and the whole DAG run
is marked as "failed".
I noticed the following line in the sensor's logs:[2016-10-13 19:15:17,033] {jobs.py:1976}
WARNING - State of this instance has been externally set to None. Taking the poison pill.
So long.
I would like to ask you how to properly mark an operator to be restarted cleanly.
Before I proceed any further I would also like to ask for your opinion of this approach. How
do you handle long running sensors? Do you use Docker with Airflow?
Any advice would be greatly appreciated. Thanks,
Michal
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message