Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CB367200D1E for ; Wed, 18 Oct 2017 15:12:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C9837160BEA; Wed, 18 Oct 2017 13:12:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6D7971609EE for ; Wed, 18 Oct 2017 15:12:40 +0200 (CEST) Received: (qmail 11098 invoked by uid 500); 18 Oct 2017 13:12:39 -0000 Mailing-List: contact dev-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list dev@airflow.incubator.apache.org Received: (qmail 11086 invoked by uid 99); 18 Oct 2017 13:12:39 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Oct 2017 13:12:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 21A451806BA for ; Wed, 18 Oct 2017 13:12:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.629 X-Spam-Level: ** X-Spam-Status: No, score=2.629 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 6EXHFM0vc310 for ; Wed, 18 Oct 2017 13:12:25 +0000 (UTC) Received: from mail-oi0-f42.google.com (mail-oi0-f42.google.com [209.85.218.42]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 598535F3DE for ; Wed, 18 Oct 2017 13:12:25 +0000 (UTC) Received: by mail-oi0-f42.google.com with SMTP id g125so8729733oib.12 for ; Wed, 18 Oct 2017 06:12:25 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=IyP8A96GZF3sgdeu7hETFA8gHqpgEQax30/6fqwUO7o=; b=cSD0QQtnnahiLQ7IgenHDf++Hx16BGPQL53LAUBr28b88/w3lxBkd8gWZbPm9y+r5F lbggEeeXeiGtGfsTVvkgPIeIE2c03aAejaOwcY4zVAvn5X/J9LHzOM9AhGkou/72nxxs G9+XnM060q9K2ojTpW5L7GvJ0YZSaa/alCH9UjcD6lAliZJSh7LKfzzqsHdODUhkvTVf 9/YOP4od/NIkN4hnQVfJWHFIVUYq/m9fHR/oB4iuNXvwec0e11wIOgyL4H6jWDOSGIuU VZeH6ZRrcqWCiqyhRXJEgx0XJx/PSWlwH5YZ7/VvcwEI+7hsHKROdFuxASa4OglLXsVb /+9Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=IyP8A96GZF3sgdeu7hETFA8gHqpgEQax30/6fqwUO7o=; b=OwMyMquoDTI5DASefCHNfrz58LXHU0JoI9yxVPMiIYGa5dv8M0m6XEMCa/yntnrzjn buNE5vnMonB040+dqX+yRts+RYYbCe65FgqL9xhEjovom4ZNSRmZSlvnkv8IeO4iQVVG /g/9iyRWtlToU7lCg32tqICBEujcAWU8MEL+RMfifrHZ016KVt55q/h0Z2FunzFUQE9D QQymHofd1Iiy7zKZ2u09L4FlWxITcbj1fYU3OaQvYjvm5WKZGMnOHyjIuOiaIuSxykKb Vie+H2Y9SnpWFuZrzWzNbgq484jH74xNIM+3hTBd+D8We3cv4bcR22ctVcaw5k1PO5jl Cpgg== X-Gm-Message-State: AMCzsaUMB4fTXkYGcTw4AfbUpzJju22sOX98ACbOw4eW1mnpFsY0ZXi1 AH1t3gRjqLXzf8V13f6i8BUyCZc83kUvLHfaYps= X-Google-Smtp-Source: ABhQp+R8ay83Llove3M9v1p4SkhqlsuymGlSQuNrIOusPdcIlAo84hS6SjnDcmPy7a8NfNjO0oe6nCfoWSlbExxw+WA= X-Received: by 10.202.244.78 with SMTP id s75mr8398495oih.24.1508332338809; Wed, 18 Oct 2017 06:12:18 -0700 (PDT) MIME-Version: 1.0 References: <1B33DFE5-E2A9-4049-98DA-68E693125E1E@gmail.com> In-Reply-To: From: Or Sher Date: Wed, 18 Oct 2017 13:12:08 +0000 Message-ID: Subject: Re: Skip task To: "dev@airflow.incubator.apache.org" Content-Type: multipart/alternative; boundary="001a1134e6ac421025055bd1fdc9" archived-at: Wed, 18 Oct 2017 13:12:42 -0000 --001a1134e6ac421025055bd1fdc9 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable I found this thread while searching for the exact same feature. Was you PR ever merged? Is there any other functionality that handles this in the current release? (I'm still on 1.7) I'd really hate using the branch and dummy operators for every task I might want to skip. On Mon, Dec 12, 2016 at 1:56 PM Maycock, Luke < luke.maycock@affiliate.oliverwyman.com> wrote: > Excellent - thanks for your help Bolke, much appreciated! > > > Cheers, > Luke Maycock > OLIVER WYMAN > luke.maycock@affiliate.oliverwyman.com luke.maycock@affiliate.oliverwyman.com> > www.oliverwyman.com > > > > ________________________________ > From: Bolke de Bruin > Sent: 12 December 2016 10:40 > To: dev@airflow.incubator.apache.org > Subject: Re: Skip task > > Have a look at: > https://github.com/apache/incubator-airflow/blob/master/airflow/migration= s/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py > < > https://github.com/apache/incubator-airflow/blob/master/airflow/migration= s/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py > > > > Make sure to include "type_=3Dmysql.DATETIME(fsp=3D6)=E2=80=9D for your D= ateTime types > on MySQL. > > - Bolke > > > > > Op 12 dec. 2016, om 11:33 heeft Maycock, Luke < > luke.maycock@affiliate.oliverwyman.com> het volgende geschreven: > > > > It is a new table named 'TaskExclusion'. The migration script for this > is as follows: > > > > def upgrade(): > > op.create_table( > > 'task_exclusion', > > sa.Column('id', sa.Integer(), nullable=3DFalse), > > sa.Column('dag_id', sa.String(length=3D250), nullable=3DFalse), > > sa.Column('task_id', sa.String(length=3D250), nullable=3DFalse), > > sa.Column('exclusion_type', sa.String(length=3D32), nullable=3DF= alse), > > sa.Column('exclusion_start_date', sa.DateTime(), nullable=3DFals= e), > > sa.Column('exclusion_end_date', sa.DateTime(), nullable=3DFalse)= , > > sa.Column('created_by', sa.String(length=3D256), nullable=3DFals= e), > > sa.Column('created_on', sa.DateTime(), nullable=3DFalse), > > sa.PrimaryKeyConstraint('id')) > > > > > > def downgrade(): > > op.drop_table('task_exclusion') > > > > This is the PR for the exclusion of a task. We review our code > internally before setting up a PR into the main repo for the next review, > hence the PR being in our fork. The PR does not yet contain our unit test= s. > > > > > > Cheers, > > Luke Maycock > > OLIVER WYMAN > > luke.maycock@affiliate.oliverwyman.com luke.maycock@affiliate.oliverwyman.com> > > www.oliverwyman.com > > > > > > > > ________________________________ > > From: Bolke de Bruin > > Sent: 09 December 2016 20:54 > > To: dev@airflow.incubator.apache.org > > Subject: Re: Skip task > > > > What table was this? I recently pushed a fix that allows fractional > seconds in our minimum supported version of MySQL (5.6.4 and beyond). > > > > I might have missed something. > > > > Thanks > > Bolke > > > > Sent from my iPhone > > > >> On 9 Dec 2016, at 14:27, Maycock, Luke < > luke.maycock@affiliate.oliverwyman.com> wrote: > >> > >> I found the issue to be that, for MySQL, the datetime was being rounde= d > to the nearest second. The strange thing is that if a datetime without th= e > microseconds was passed to SQLAlchemy, the insertion into MySQL failed; b= ut > when a datetime with microseconds was passed, the microseconds are remove= d > by rounding to the nearest second. > >> > >> > >> Hopefully, this will prevent someone else going down the same rabbit > hole that I did. > >> > >> > >> Cheers, > >> Luke Maycock > >> OLIVER WYMAN > >> luke.maycock@affiliate.oliverwyman.com luke.maycock@affiliate.oliverwyman.com> > >> www.oliverwyman.com > >> > >> > >> ________________________________ > >> From: Maycock, Luke > >> Sent: 08 December 2016 10:44:32 > >> To: dev@airflow.incubator.apache.org > >> Subject: Re: Skip task > >> > >> Hi All, > >> > >> > >> We have implemented a solution for allowing the exclusion of individua= l > tasks during a DAG run. However, when writing unit tests for this, we are > encountering an issue with MySQL, which I am hoping someone is able to he= lp > us with. > >> > >> > >> For our solution, we have a new 'TaskExclusion' table in the meta-data= . > Our unit tests were run by Travis, not locally. > >> > >> > >> The code block under test: > >> > >> > >> class TaskExclusion(Base): > >> """ > >> This class is used to define objects that can be used to specify not t= o > >> run a given task in a given dag on a variety of execution date > conditions. > >> These objects will be stored in the backend database in the > task_exclusion > >> table. > >> Static methods are provided for the creation, removal and investigatio= n > of > >> these objects. > >> """ > >> > >> __tablename__ =3D "task_exclusion" > >> > >> id =3D Column(Integer(), primary_key=3DTrue) > >> dag_id =3D Column(String(ID_LEN), nullable=3DFalse) > >> task_id =3D Column(String(ID_LEN), nullable=3DFalse) > >> exclusion_type =3D Column(String(32), nullable=3DFalse) > >> exclusion_start_date =3D Column(DateTime, nullable=3DTrue) > >> exclusion_end_date =3D Column(DateTime, nullable=3DTrue) > >> created_by =3D Column(String(256), nullable=3DFalse) > >> created_on =3D Column(DateTime, nullable=3DFalse) > >> > >> @classmethod > >> @provide_session > >> def set( > >> cls, > >> dag_id, > >> task_id, > >> exclusion_type, > >> exclusion_start_date, > >> exclusion_end_date, > >> created_by, > >> session=3DNone): > >> """ > >> Add a task exclusion to prevent a task running under certain > >> circumstances. > >> :param dag_id: The dag_id of the DAG containing the task to exclude > >> from execution. > >> :param task_id: The task_id of the task to exclude from execution. > >> :param exclusion_type: The type of circumstances to exclude the task > >> from execution under. See the TaskExclusionType class for more detail. > >> :param exclusion_start_date: The execution_date to start excluding on. > >> This will be ignored if the exclusion_type is INDEFINITE. > >> :param exclusion_end_date: The execution_date to stop excluding on. > >> This will be ignored if the exclusion_type is INDEFINITE or > >> SINGLE_DATE. > >> :param created_by: Who is creating this exclusion. Stored with the > >> exclusion record for auditing/debugging purposes. > >> :return: None. > >> """ > >> > >> session.expunge_all() > >> > >> # Set up execution date range correctly > >> if exclusion_type =3D=3D TaskExclusionType.SINGLE_DATE: > >> if exclusion_start_date: > >> exclusion_end_date =3D exclusion_start_date > >> else: > >> raise AirflowException( > >> "No exclusion_start_date " > >> ) > >> elif exclusion_type =3D=3D TaskExclusionType.DATE_RANGE: > >> if exclusion_start_date > exclusion_end_date: > >> raise AirflowException( > >> "The exclusion_start_date is after the > exclusion_end_date" > >> ) > >> elif exclusion_type =3D=3D TaskExclusionType.INDEFINITE: > >> exclusion_start_date =3D None > >> exclusion_end_date =3D None > >> else: > >> raise AirflowException( > >> "The exclusion_type, {}, is not recognised." > >> .format(exclusion_type) > >> ) > >> > >> # remove any duplicate exclusions > >> session.query(cls).filter( > >> cls.dag_id =3D=3D dag_id, > >> cls.task_id =3D=3D task_id, > >> cls.exclusion_type =3D=3D exclusion_type, > >> cls.exclusion_start_date =3D=3D exclusion_start_date, > >> cls.exclusion_end_date =3D=3D exclusion_end_date > >> ).delete() > >> > >> # insert new exclusion > >> session.add(TaskExclusion( > >> dag_id=3Ddag_id, > >> task_id=3Dtask_id, > >> exclusion_type=3Dexclusion_type, > >> exclusion_start_date=3Dexclusion_start_date, > >> exclusion_end_date=3Dexclusion_end_date, > >> created_by=3Dcreated_by, > >> created_on=3Ddatetime.now()) > >> ) > >> > >> session.commit() > >> > >> > >> The unit test: > >> > >> class TaskExclusionTest(unittest.TestCase): > >> def test_set_exclusion(self, session=3DNone): > >> > >> session =3D settings.Session() > >> > >> session.expunge_all() > >> > >> dag_id =3D 'test_task_exclude' > >> task_id =3D 'test_task_exclude' > >> exec_date =3D datetime.datetime.now() > >> > >> TaskExclusion.set(dag_id=3Ddag_id, > >> task_id=3Dtask_id, > >> exclusion_type=3DTaskExclusionType.SINGLE_DATE, > >> exclusion_start_date=3Dexec_date, > >> exclusion_end_date=3Dexec_date, > >> created_by=3D'airflow') > >> > >> > >> exclusion =3D session.query(TaskExclusion).filter( > >> TaskExclusion.dag_id =3D=3D dag_id, > >> TaskExclusion.task_id =3D=3D task_id, > >> TaskExclusion.exclusion_type =3D=3D > TaskExclusionType.SINGLE_DATE, > >> TaskExclusion.exclusion_start_date =3D=3D exec_da= te, > >> TaskExclusion.exclusion_end_date =3D=3D > exec_date).first() > >> > >> self.assertTrue(exclusion) > >> > >> > >> The unit test passes for postgreSQL and SQLite but fails for MySQL. I > have checked and the 'exclusion' variable contains a TaskExclusion object > for postgreSQL and SQLite but is set to 'None' for MySQL. Any suggestions > on what could be causing this would be much appreciated. > >> > >> > >> Cheers, > >> Luke Maycock > >> OLIVER WYMAN > >> luke.maycock@affiliate.oliverwyman.com luke.maycock@affiliate.oliverwyman.com> > >> www.oliverwyman.com > >> > >> > >> > >> ________________________________ > >> From: siddharth anand > >> Sent: 16 November 2016 00:40 > >> To: dev@airflow.incubator.apache.org > >> Subject: Re: Skip task > >> > >> If your requirement is to skip a portion of tasks in a DagRun based on > some > >> state encountered while executing that DagRun, that is what > >> BranchPythonOperator or ShortCircruitOperator (optionally paired with = a > >> Trigger Rule specified on a downstream task) is made for. > >> > >> These operators take a custom Python callable as a argument. The > callable > >> can check for the existence of data or files that should have been > >> generated by an external system or an upstream task in the same DAG. T= he > >> callables need to return a Boolean value in the case of the > >> ShortCircruitOperator or a selected choice (i.e. branch to take) as in > the > >> case of the BranchPythonOperator. > >> > >> If you have 20 tasks that all depend on the presence of 20 different > files, > >> you would need 20 ShortCircruitOperator or BranchPythonOperator tasks > each > >> either sharing a common callable or each with its own callable. > >> > >> One could argue that these tasks are "overhead" because they just > encompass > >> some conditional or control logic and that DAGs should only contain > >> workhorse tasks (i.e. tasks that do some work). DAGs with > workhorse-only > >> tasks are more of a pure dataflow approach -- i.e. no control-logic > >> operators. However, I don't see another option. > >> > >> In the current system, a callable registered with a > ShortCircruitOperator > >> would check for the presence of a file -- if the file were not > available, > >> then a series of downstream tasks would be skipped in that DAGRun, > until a > >> task with a Trigger_Rule=3D"all_done" were encountered, downstream of > which, > >> tasks would no longer be skipped for the DagRun. > >> > >> I hope this makes sense. > >> > >> A long time ago, I proposed UI functionality to skip a series of DAG > runs > >> via the UI, because I knew that no data was available for that time > range > >> from an external system. It wanted to essentially specify a "blackout" > >> period in terms of a time range that covered multiple DagRuns. My > intention > >> was for backfills to skip those days. It turns out that my company did > not > >> end up having such a requirement, so I dropped the feature request. > >> > >> If this is what you are asking for, then I am +1. Please implement it > and > >> submit a PR. > >> > >> On Tue, Nov 15, 2016 at 2:50 AM, Maycock, Luke < > >> luke.maycock@affiliate.oliverwyman.com> wrote: > >> > >>> Thank you for taking the time to respond. This is a great approach if > you > >>> know at the time of creating the DAG which tasks you expect to need t= o > >>> skip. However, I don't think this is exactly the use case I have. For > >>> example, I may be expecting a file to arrive in an FTP folder for > loading > >>> into a database but one day it doesn't arrive so I just want to skip > that > >>> task on that day. > >>> > >>> > >>> Our workflows commonly have around 20 of these types of tasks in. I > could > >>> configure all of these tasks in the way you suggested in case I ever > need > >>> to skip one of them. However, I'd prefer not to have to set the tasks > up > >>> this way and instead have the ability just to skip a task on an ad-ho= c > >>> basis. I could then also use this functionality to add the ability to > run > >>> from a certain point in a DAG or to a certain point in the DAG. > >>> > >>> > >>> > >>> Thanks, > >>> Luke Maycock > >>> OLIVER WYMAN > >>> luke.maycock@affiliate.oliverwyman.com >>> maycock@affiliate.oliverwyman.com> > >>> www.oliverwyman.com > >>> > >>> > >>> > >>> ________________________________ > >>> From: siddharth anand > >>> Sent: 14 November 2016 19:48 > >>> To: dev@airflow.incubator.apache.org > >>> Subject: Re: Skip task > >>> > >>> For cases like this, we (Agari) use the following approach : > >>> > >>> 1. Create a Variable in the UI of type boolean such as > *enable_feature_x* > >>> 2. Use a ShortCircuitOperator (or BranchPythonOperator) to Skip > >>> downstream processing based on the value of *enable_feature_x* > >>> 3. Assuming that you don't want to skip ALL downstream tasks, you can > >>> use a trigger_rule of all_done to resume processing some portion of > your > >>> downstream DAG after skipping an upstream portion > >>> > >>> In other words, there is already a means to achieve what you are > asking for > >>> today. You can change the value of via *enable_feature_x *the UI. If > you'd > >>> like to enhance the UI to better capture this pattern, pls submit a P= R. > >>> -s > >>> > >>> On Thu, Nov 10, 2016 at 1:20 PM, Maycock, Luke < > >>> luke.maycock@affiliate.oliverwyman.com> wrote: > >>> > >>>> Hi Gerard, > >>>> > >>>> > >>>> I see the new status as having a number of uses: > >>>> > >>>> 1. A user can manually set a task to skip in a DAG run via the UI. > >>>> 2. We can then make use of this new status to add the following > >>>> functionality to Airflow: > >>>> * Run a DAG run up to a certain point and have the rest of the > >>> tasks > >>>> have the new status. > >>>> * Run a DAG run from a certain task to the end, setting all > >>>> pre-requisite tasks to have this new status. > >>>> > >>>> I am happy to be challenged on the above use cases if there are bett= er > >>>> ways to achieve the same things. > >>>> > >>>> Cheers, > >>>> Luke Maycock > >>>> OLIVER WYMAN > >>>> luke.maycock@affiliate.oliverwyman.com >>>> maycock@affiliate.oliverwyman.com> > >>>> www.oliverwyman.com > >>>> > >>>> > >>>> > >>>> ________________________________ > >>>> From: Gerard Toonstra > >>>> Sent: 09 November 2016 18:08 > >>>> To: dev@airflow.incubator.apache.org > >>>> Subject: Re: Skip task > >>>> > >>>> Hey Luke, > >>>> > >>>> Who or what makes the decision to skip processing that task? > >>>> > >>>> Rgds, > >>>> > >>>> Gerard > >>>> > >>>> On Wed, Nov 9, 2016 at 2:39 PM, Maycock, Luke < > >>>> luke.maycock@affiliate.oliverwyman.com> wrote: > >>>> > >>>>> Hi Gerard, > >>>>> > >>>>> > >>>>> Thank you for your quick response. > >>>>> > >>>>> > >>>>> I am not trying to implement this for a specific operator but rathe= r > >>>>> trying to add it as a feature for any task in any DAG. > >>>>> > >>>>> > >>>>> Given that the skipped states propagate where all directly upstream > >>> tasks > >>>>> are skipped, I don't think this is the state we want to use. For th= e > >>>>> functionality I'm looking for, I think I'll need to introduce a new > >>>> status, > >>>>> maybe 'disabled'. > >>>>> > >>>>> > >>>>> Again, thanks for your response. > >>>>> > >>>>> > >>>>> Cheers, > >>>>> Luke Maycock > >>>>> OLIVER WYMAN > >>>>> luke.maycock@affiliate.oliverwyman.com >>>>> maycock@affiliate.oliverwyman.com> > >>>>> www.oliverwyman.com > >>>>> > >>>>> > >>>>> > >>>>> ________________________________ > >>>>> From: Gerard Toonstra > >>>>> Sent: 08 November 2016 18:19 > >>>>> To: dev@airflow.incubator.apache.org > >>>>> Subject: Re: Skip task > >>>>> > >>>>> Also in 1.7.1.3, there's the ShortCircuitOperator, which can give y= ou > >>> an > >>>>> example. > >>>>> > >>>>> https://github.com/apache/incubator-airflow/blob/1.7.1. > >>>>> 3/airflow/operators/python_operator.py > >>>>> > >>>>> You'd have to modify this to your needs, but the way it works is th= at > >>> if > >>>>> the condition evaluates to True, none of the > >>>>> downstream tasks are actually executed, they'd be skipped. The reas= on > >>> for > >>>>> putting them into SKIPPED state is that > >>>>> the DAG final result would still be SUCCESS and not failed. > >>>>> > >>>>> You could copy the operator from there and don't do the full "for > >>> loop", > >>>>> only pick the tasks immediately downstream > >>>>> from this operator and skip that. Or... if you need to skip > additional > >>>>> tasks downstream, add a parameter "num_tasks" > >>>>> that decide on a halting condition for the for loop. > >>>>> > >>>>> I believe that should work. I didn't try that here, but you can tes= t > >>> that > >>>>> and see what it does for you. > >>>>> > >>>>> > >>>>> If you want this as a UI capability... for example have a human > >>> operator > >>>>> decide on skipping this yes or not, then > >>>>> maybe the best way forward would be some kind of highly custom plug= in > >>>> with > >>>>> its own view. In the end, you'd basically > >>>>> do the same action in the backend, whether the python cond evaluate= s > to > >>>>> True or the button is clicked. > >>>>> > >>>>> In the plugin case though, you'd have to keep the UI and the > structure > >>> of > >>>>> the DAG in sync and aligned, otherwise > >>>>> it'd become a mess.... Airflow wasn't really developed for > >>> workflow/human > >>>>> interaction, but in workflows where only > >>>>> automated processes are involved. That doesn't mean that you can't = do > >>>>> anything like that, but it may be costly resource > >>>>> wise to get this done. For example, on the basis of the > BranchOperator, > >>>> you > >>>>> could call an external API to verify if a decision > >>>>> was taken on a case, then follow branch A or B if the decision is > there > >>>> or > >>>>> put the state back into UP_FOR_RETRY. > >>>>> At the moment though, there's no programmatic way to reschedule tha= t > >>> task > >>>>> to some minutes or hours into the future before > >>>>> it's looked at again, unless you really dive into airflow, scheduli= ng > >>>>> semantics (@once vs. other schedules) and how > >>>>> the scheduler works. > >>>>> > >>>>> Rgds, > >>>>> > >>>>> Gerard > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On Tue, Nov 8, 2016 at 5:30 PM, Maycock, Luke < > >>>>> luke.maycock@affiliate.oliverwyman.com> wrote: > >>>>> > >>>>>> Hi All, > >>>>>> > >>>>>> > >>>>>> I am using Airflow 1.7.1.3 and have a particular requirement, whic= h > I > >>>>>> don't think is currently supported by Airflow but just wanted to > >>> check > >>>> in > >>>>>> case I was missing something. > >>>>>> > >>>>>> > >>>>>> I occasionally wish to skip a particular task in a given DAG run > such > >>>>> that > >>>>>> the task does not run for that DAG run. Is this functionality > >>> available > >>>>> in > >>>>>> Airflow? > >>>>>> > >>>>>> > >>>>>> I am aware of the BranchPythonOperator (https://airflow.incubator. > >>>>>> apache.org/concepts.html#branching) but I don't think believe this > >>> is > >>>>>> exactly what I am looking for. > >>>>>> > >>>>>> > >>>>>> I am thinking that a button in the UI alongside the 'Mark Success' > >>> and > >>>>>> 'Run' buttons would be appropriate. > >>>>>> > >>>>>> > >>>>>> If the functionality does not exist, does anyone have any > suggestions > >>>> on > >>>>>> ways to implement this? > >>>>>> > >>>>>> > >>>>>> Cheers, > >>>>>> Luke Maycock > >>>>>> OLIVER WYMAN > >>>>>> luke.maycock@affiliate.oliverwyman.com >>>>>> maycock@affiliate.oliverwyman.com> > >>>>>> www.oliverwyman.com > >>>>>> > >>>>>> > >>>>>> ________________________________ > >>>>>> This e-mail and any attachments may be confidential or legally > >>>>> privileged. > >>>>>> If you received this message in error or are not the intended > >>>> recipient, > >>>>>> you should destroy the e-mail message and any attachments or copie= s, > >>>> and > >>>>>> you are prohibited from retaining, distributing, disclosing or usi= ng > >>>> any > >>>>>> information contained herein. Please inform us of the erroneous > >>>> delivery > >>>>> by > >>>>>> return e-mail. Thank you for your cooperation. > >>>>>> > >>>>> > >>>>> ________________________________ > >>>>> This e-mail and any attachments may be confidential or legally > >>>> privileged. > >>>>> If you received this message in error or are not the intended > >>> recipient, > >>>>> you should destroy the e-mail message and any attachments or copies= , > >>> and > >>>>> you are prohibited from retaining, distributing, disclosing or usin= g > >>> any > >>>>> information contained herein. Please inform us of the erroneous > >>> delivery > >>>> by > >>>>> return e-mail. Thank you for your cooperation. > >>>>> > >>>> > >>>> ________________________________ > >>>> This e-mail and any attachments may be confidential or legally > >>> privileged. > >>>> If you received this message in error or are not the intended > recipient, > >>>> you should destroy the e-mail message and any attachments or copies, > and > >>>> you are prohibited from retaining, distributing, disclosing or using > any > >>>> information contained herein. Please inform us of the erroneous > delivery > >>> by > >>>> return e-mail. Thank you for your cooperation. > >>>> > >>> > >>> ________________________________ > >>> This e-mail and any attachments may be confidential or legally > privileged. > >>> If you received this message in error or are not the intended > recipient, > >>> you should destroy the e-mail message and any attachments or copies, > and > >>> you are prohibited from retaining, distributing, disclosing or using > any > >>> information contained herein. Please inform us of the erroneous > delivery by > >>> return e-mail. Thank you for your cooperation. > >>> > >> > >> ________________________________ > >> This e-mail and any attachments may be confidential or legally > privileged. If you received this message in error or are not the intended > recipient, you should destroy the e-mail message and any attachments or > copies, and you are prohibited from retaining, distributing, disclosing o= r > using any information contained herein. Please inform us of the erroneous > delivery by return e-mail. Thank you for your cooperation. > >> > >> ________________________________ > >> This e-mail and any attachments may be confidential or legally > privileged. If you received this message in error or are not the intended > recipient, you should destroy the e-mail message and any attachments or > copies, and you are prohibited from retaining, distributing, disclosing o= r > using any information contained herein. Please inform us of the erroneous > delivery by return e-mail. Thank you for your cooperation. > > > > ________________________________ > > This e-mail and any attachments may be confidential or legally > privileged. If you received this message in error or are not the intended > recipient, you should destroy the e-mail message and any attachments or > copies, and you are prohibited from retaining, distributing, disclosing o= r > using any information contained herein. Please inform us of the erroneous > delivery by return e-mail. Thank you for your cooperation. > > > ________________________________ > This e-mail and any attachments may be confidential or legally privileged= . > If you received this message in error or are not the intended recipient, > you should destroy the e-mail message and any attachments or copies, and > you are prohibited from retaining, distributing, disclosing or using any > information contained herein. Please inform us of the erroneous delivery = by > return e-mail. Thank you for your cooperation. > --001a1134e6ac421025055bd1fdc9--