Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C3F63200BA1 for ; Mon, 17 Oct 2016 12:30:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C0ABD160AEC; Mon, 17 Oct 2016 10:30:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1060E160AE5 for ; Mon, 17 Oct 2016 12:30:00 +0200 (CEST) Received: (qmail 67980 invoked by uid 500); 17 Oct 2016 10:30:00 -0000 Mailing-List: contact dev-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list dev@airflow.incubator.apache.org Received: (qmail 67968 invoked by uid 99); 17 Oct 2016 10:29:59 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Oct 2016 10:29:59 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 510151A059E for ; Mon, 17 Oct 2016 10:29:59 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -1.801 X-Spam-Level: X-Spam-Status: No, score=-1.801 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H2=-0.001, RP_MATCHES_RCVD=-2.999, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=yahoo.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id hhYjRh161Xcr for ; Mon, 17 Oct 2016 10:29:56 +0000 (UTC) Received: from nm32-vm0.bullet.mail.ne1.yahoo.com (nm32-vm0.bullet.mail.ne1.yahoo.com [98.138.229.48]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 65FA95FAC8 for ; Mon, 17 Oct 2016 10:29:56 +0000 (UTC) Received: from [127.0.0.1] by nm32.bullet.mail.ne1.yahoo.com with NNFMP; 17 Oct 2016 10:29:49 -0000 Received: from [98.138.101.131] by nm32.bullet.mail.ne1.yahoo.com with NNFMP; 17 Oct 2016 10:26:56 -0000 Received: from [98.138.89.250] by tm19.bullet.mail.ne1.yahoo.com with NNFMP; 17 Oct 2016 10:25:15 -0000 Received: from [127.0.0.1] by omp1042.mail.ne1.yahoo.com with NNFMP; 17 Oct 2016 10:25:15 -0000 X-Yahoo-Newman-Property: ymail-4 X-Yahoo-Newman-Id: 322082.52874.bm@omp1042.mail.ne1.yahoo.com X-YMail-OSG: sSU9P70VM1mdYnyxDcRrF4G69U5bVz8ANP6pzfQeE3pbTdfTNKwfWtxQaGGzm1V 5fKOJS8.c_L1ixDMPCGUnMRrc.XHjQ5DQr3T3AVyQoikaJ5OY1ljkmGENoOM3vKicw7RaKkif.ZY KKdMXTGDzoYXGyoVTH6e3PtsNuNiVdBxFR_iJplQJqWiwy1HXviA39k7.nIxrKDjV2Lwub5IoSQV N_rJqF9O30mu9k3aUcSasyJ9sXuBJjlNp1a059VOZPX4t47FcLd_5dqc4nMIHrV9ebugk6anGlOq ATjLlZTNxp2P.XXqQaLVjlwUwrprfYAPv1IjghXCDF0R69LlCCPJ2bcQerQu08K6IoiW2QYEEE3x 1T78Pvey_5IzMwCbNWJ7BMDQ7GN0gLWDTg6MtfSVTJP34cUlvbhKGePEgF3SQ_tdX.Z.WkXlcUXj ozDM0V4udTP8IfGf2OykW1CVlSJ8LbZCd5AkAOzlWoks_wMeXTg55CECsUWj2PtM79kBdFpdwktr RcORCTkntrrTTe8pN1WlN_b2K5is3GsaH9qQuzGPBSAZjePI6fk0ll_xtjlCAUX6ZgOw_DB1Vkpw zGC5FfcUy7jfGqSj3mzZfqqQBehymNROqVt4jjkt4KRnnMFH3ShJ9caz3orj7KWl4kqg.oKDtsLQ Dtv0s.ArhRAgke9JU Received: from jws200108.mail.ne1.yahoo.com by sendmailws114.mail.ne1.yahoo.com; Mon, 17 Oct 2016 10:25:14 +0000; 1476699914.884 Date: Mon, 17 Oct 2016 10:24:45 +0000 (UTC) From: Michal K Reply-To: Michal K To: "dev@airflow.incubator.apache.org" Message-ID: <467686170.1052419.1476699885895@mail.yahoo.com> Subject: Suspend and resume Sensors MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="----=_Part_1052418_1557389063.1476699885890" References: <467686170.1052419.1476699885895.ref@mail.yahoo.com> archived-at: Mon, 17 Oct 2016 10:30:01 -0000 ------=_Part_1052418_1557389063.1476699885890 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hello Everyone, our company is currently considering Airflow for the engine of a pretty big= workflow system.We use Python 3 and have some HA requirements, so we plan = to use Celery with multiple worker and scheduler nodes and RabbitMQ as mess= age bus. We are also using Docker containers for deployment. We have a specific case, which we need to handle in our workflows. Some tas= ks, which we trigger are very long lived (many hours long). These long task= s are carried out by external systems, independent of the Airflow nodes. In= our workflow need to start, then monitor these external tasks and trigger = downstream operators based on their results. This seems like a perfect job for Airflow sensors. Here however, we run int= o a small problem.=C2=A0In our testing it seems not possible to stop an Air= flow worker while a sensor is running. The sensor continues to run during a= Celery warm shutdown and it's background process remains active even after= a cold shutdown. This prevents us from being able to safely destroy the wo= rker's Docker container. Since at least one of the external tasks we need to monitor is practically = always running, we would not be able to stop our Airflow workers in order t= o deploy code changes to our DAGs or the libraries they depend on. It would be perfect for our case to have the ability to suspend an Airflow = sensor while shutting down the worker and resume it when the worker restart= s. I was wondering how this could be implemented in Airflow and I came up with= this initial idea:https://github.com/postrational/incubator-airflow/commit= /74ac6f7290d6838362ff437e228465bb49fe198f The code adds a signal handler to the BaseSensorOperator, which raises an e= xception if it detects that the worker is shutting down (SIGINT detected). Later on it handles the exception in the 'run' method of the TaskInstance. = The idea is to put the sensor in a state which would cause it to be cleanly= resumed after Airflow comes back up after a restart. So far, my draft code works some of the time, but not always. Sometimes the= sensor resumes correctly, but sometimes it doesn't trigger its downstream = operator and the whole DAG run is marked as "failed". I noticed the following line in the sensor's logs:[2016-10-13 19:15:17,033]= {jobs.py:1976} WARNING - State of this instance has been externally set to= None. Taking the poison pill. So long. I would like to ask you how to properly mark an operator to be restarted cl= eanly. Before I proceed any further I would also like to ask for your opinion of t= his approach.=C2=A0How do you handle long running sensors?=C2=A0Do you use = Docker with Airflow? Any advice would be greatly appreciated. Thanks, Michal ------=_Part_1052418_1557389063.1476699885890--