airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Maycock, Luke" <luke.mayc...@affiliate.oliverwyman.com>
Subject Re: Skip task
Date Mon, 12 Dec 2016 10:33:14 GMT
It is a new table named 'TaskExclusion'. The migration script for this is as follows:

def upgrade():
    op.create_table(
        'task_exclusion',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('dag_id', sa.String(length=250), nullable=False),
        sa.Column('task_id', sa.String(length=250), nullable=False),
        sa.Column('exclusion_type', sa.String(length=32), nullable=False),
        sa.Column('exclusion_start_date', sa.DateTime(), nullable=False),
        sa.Column('exclusion_end_date', sa.DateTime(), nullable=False),
        sa.Column('created_by', sa.String(length=256), nullable=False),
        sa.Column('created_on', sa.DateTime(), nullable=False),
        sa.PrimaryKeyConstraint('id'))


def downgrade():
    op.drop_table('task_exclusion')

This is the PR for the exclusion of a task. We review our code internally before setting up
a PR into the main repo for the next review, hence the PR being in our fork. The PR does not
yet contain our unit tests.


Cheers,
Luke Maycock
OLIVER WYMAN
luke.maycock@affiliate.oliverwyman.com<mailto:luke.maycock@affiliate.oliverwyman.com>
www.oliverwyman.com<http://www.oliverwyman.com/>



________________________________
From: Bolke de Bruin <bdbruin@gmail.com>
Sent: 09 December 2016 20:54
To: dev@airflow.incubator.apache.org
Subject: Re: Skip task

What table was this? I recently pushed a fix that allows fractional seconds in our minimum
supported version of MySQL (5.6.4 and beyond).

I might have missed something.

Thanks
Bolke

Sent from my iPhone

> On 9 Dec 2016, at 14:27, Maycock, Luke <luke.maycock@affiliate.oliverwyman.com>
wrote:
>
> I found the issue to be that, for MySQL, the datetime was being rounded to the nearest
second. The strange thing is that if a datetime without the microseconds was passed to SQLAlchemy,
the insertion into MySQL failed; but when a datetime with microseconds was passed, the microseconds
are removed by rounding to the nearest second.
>
>
> Hopefully, this will prevent someone else going down the same rabbit hole that I did.
>
>
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> luke.maycock@affiliate.oliverwyman.com<mailto:luke.maycock@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
>
>
> ________________________________
> From: Maycock, Luke <luke.maycock@affiliate.oliverwyman.com>
> Sent: 08 December 2016 10:44:32
> To: dev@airflow.incubator.apache.org
> Subject: Re: Skip task
>
> Hi All,
>
>
> We have implemented a solution for allowing the exclusion of individual tasks during
a DAG run. However, when writing unit tests for this, we are encountering an issue with MySQL,
which I am hoping someone is able to help us with.
>
>
> For our solution, we have a new 'TaskExclusion' table in the meta-data. Our unit tests
were run by Travis, not locally.
>
>
> The code block under test:
>
>
> class TaskExclusion(Base):
>   """
> This class is used to define objects that can be used to specify not to
> run a given task in a given dag on a variety of execution date conditions.
> These objects will be stored in the backend database in the task_exclusion
> table.
> Static methods are provided for the creation, removal and investigation of
> these objects.
> """
>
> __tablename__ = "task_exclusion"
>
> id = Column(Integer(), primary_key=True)
>   dag_id = Column(String(ID_LEN), nullable=False)
>   task_id = Column(String(ID_LEN), nullable=False)
>   exclusion_type = Column(String(32), nullable=False)
>   exclusion_start_date = Column(DateTime, nullable=True)
>   exclusion_end_date = Column(DateTime, nullable=True)
>   created_by = Column(String(256), nullable=False)
>   created_on = Column(DateTime, nullable=False)
>
>   @classmethod
> @provide_session
> def set(
>           cls,
>           dag_id,
>           task_id,
>           exclusion_type,
>           exclusion_start_date,
>           exclusion_end_date,
>           created_by,
>           session=None):
>       """
> Add a task exclusion to prevent a task running under certain
> circumstances.
> :param dag_id: The dag_id of the DAG containing the task to exclude
> from execution.
> :param task_id: The task_id of the task to exclude from execution.
> :param exclusion_type: The type of circumstances to exclude the task
> from execution under. See the TaskExclusionType class for more detail.
> :param exclusion_start_date: The execution_date to start excluding on.
> This will be ignored if the exclusion_type is INDEFINITE.
> :param exclusion_end_date: The execution_date to stop excluding on.
> This will be ignored if the exclusion_type is INDEFINITE or
> SINGLE_DATE.
> :param created_by: Who is creating this exclusion. Stored with the
> exclusion record for auditing/debugging purposes.
> :return: None.
> """
>
> session.expunge_all()
>
>       # Set up execution date range correctly
> if exclusion_type == TaskExclusionType.SINGLE_DATE:
>           if exclusion_start_date:
>               exclusion_end_date = exclusion_start_date
>           else:
>               raise AirflowException(
>                   "No exclusion_start_date "
> )
>       elif exclusion_type == TaskExclusionType.DATE_RANGE:
>           if exclusion_start_date > exclusion_end_date:
>               raise AirflowException(
>                   "The exclusion_start_date is after the exclusion_end_date"
> )
>       elif exclusion_type == TaskExclusionType.INDEFINITE:
>           exclusion_start_date = None
> exclusion_end_date = None
> else:
>           raise AirflowException(
>               "The exclusion_type, {}, is not recognised."
> .format(exclusion_type)
>           )
>
>       # remove any duplicate exclusions
> session.query(cls).filter(
>           cls.dag_id == dag_id,
>           cls.task_id == task_id,
>           cls.exclusion_type == exclusion_type,
>           cls.exclusion_start_date == exclusion_start_date,
>           cls.exclusion_end_date == exclusion_end_date
>       ).delete()
>
>       # insert new exclusion
> session.add(TaskExclusion(
>           dag_id=dag_id,
>           task_id=task_id,
>           exclusion_type=exclusion_type,
>           exclusion_start_date=exclusion_start_date,
>           exclusion_end_date=exclusion_end_date,
>           created_by=created_by,
>           created_on=datetime.now())
>       )
>
>       session.commit()
>
>
> The unit test:
>
> class TaskExclusionTest(unittest.TestCase):
>   def test_set_exclusion(self, session=None):
>
>       session = settings.Session()
>
>       session.expunge_all()
>
>       dag_id = 'test_task_exclude'
> task_id = 'test_task_exclude'
> exec_date = datetime.datetime.now()
>
>       TaskExclusion.set(dag_id=dag_id,
>                         task_id=task_id,
>                         exclusion_type=TaskExclusionType.SINGLE_DATE,
>                         exclusion_start_date=exec_date,
>                         exclusion_end_date=exec_date,
>                         created_by='airflow')
>
>
>       exclusion = session.query(TaskExclusion).filter(
>                       TaskExclusion.dag_id == dag_id,
>                       TaskExclusion.task_id == task_id,
>                       TaskExclusion.exclusion_type == TaskExclusionType.SINGLE_DATE,
>                       TaskExclusion.exclusion_start_date == exec_date,
>                       TaskExclusion.exclusion_end_date == exec_date).first()
>
>       self.assertTrue(exclusion)
>
>
> The unit test passes for postgreSQL and SQLite but fails for MySQL. I have checked and
the 'exclusion' variable contains a TaskExclusion object for postgreSQL and SQLite but is
set to 'None' for MySQL. Any suggestions on what could be causing this would be much appreciated.
>
>
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> luke.maycock@affiliate.oliverwyman.com<mailto:luke.maycock@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
>
>
>
> ________________________________
> From: siddharth anand <sanand@apache.org>
> Sent: 16 November 2016 00:40
> To: dev@airflow.incubator.apache.org
> Subject: Re: Skip task
>
> If your requirement is to skip a portion of tasks in a DagRun based on some
> state encountered while executing that DagRun, that is what
> BranchPythonOperator or ShortCircruitOperator (optionally paired with a
> Trigger Rule specified on a downstream task) is made for.
>
> These operators take a custom Python callable as a argument. The callable
> can check for the existence of data or files that should have been
> generated by an external system or an upstream task in the same DAG. The
> callables need to return a Boolean value in the case of the
> ShortCircruitOperator or a selected choice (i.e. branch to take) as in the
> case of the BranchPythonOperator.
>
> If you have 20 tasks that all depend on the presence of 20 different files,
> you would need 20 ShortCircruitOperator or BranchPythonOperator tasks each
> either sharing a common callable or each with its own callable.
>
> One could argue that these tasks are "overhead" because they just encompass
> some conditional or control logic and that DAGs should only contain
> workhorse tasks (i.e. tasks that do some  work). DAGs with workhorse-only
> tasks are more of a pure dataflow approach -- i.e. no control-logic
> operators. However, I don't see another option.
>
> In the current system, a callable registered with a ShortCircruitOperator
> would check for the presence of a file -- if the file were not available,
> then a series of downstream tasks would be skipped in that DAGRun, until a
> task with a Trigger_Rule="all_done" were encountered, downstream of which,
> tasks would no longer be skipped for the DagRun.
>
> I hope this makes sense.
>
> A long time ago, I proposed UI functionality to skip a series of DAG runs
> via the UI, because I knew that no data was available for that time range
> from an external system. It wanted to essentially specify a "blackout"
> period in terms of a time range that covered multiple DagRuns. My intention
> was for backfills to skip those days. It turns out that my company did not
> end up having such a requirement, so I dropped the feature request.
>
> If this is what you are asking for, then I am +1. Please implement it and
> submit a PR.
>
> On Tue, Nov 15, 2016 at 2:50 AM, Maycock, Luke <
> luke.maycock@affiliate.oliverwyman.com> wrote:
>
>> Thank you for taking the time to respond. This is a great approach if you
>> know at the time of creating the DAG which tasks you expect to need to
>> skip. However, I don't think this is exactly the use case I have. For
>> example, I may be expecting a file to arrive in an FTP folder for loading
>> into a database but one day it doesn't arrive so I just want to skip that
>> task on that day.
>>
>>
>> Our workflows commonly have around 20 of these types of tasks in. I could
>> configure all of these tasks in the way you suggested in case I ever need
>> to skip one of them. However, I'd prefer not to have to set the tasks up
>> this way and instead have the ability just to skip a task on an ad-hoc
>> basis. I could then also use this functionality to add the ability to run
>> from a certain point in a DAG or to a certain point in the DAG.
>>
>>
>>
>> Thanks,
>> Luke Maycock
>> OLIVER WYMAN
>> luke.maycock@affiliate.oliverwyman.com<mailto:luke.
>> maycock@affiliate.oliverwyman.com>
>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>
>>
>>
>> ________________________________
>> From: siddharth anand <sanand@apache.org>
>> Sent: 14 November 2016 19:48
>> To: dev@airflow.incubator.apache.org
>> Subject: Re: Skip task
>>
>> For cases like this, we (Agari) use the following approach :
>>
>>  1. Create a Variable in the UI of type boolean such as *enable_feature_x*
>>  2. Use a ShortCircuitOperator (or BranchPythonOperator) to Skip
>>  downstream processing based on the value of *enable_feature_x*
>> 3. Assuming that you don't want to skip ALL downstream tasks, you can
>>  use a trigger_rule of all_done to resume processing some portion of your
>>  downstream DAG after skipping an upstream portion
>>
>> In other words, there is already a means to achieve what you are asking for
>> today. You can change the value of via *enable_feature_x  *the UI. If you'd
>> like to enhance the UI to better capture this pattern, pls submit a PR.
>> -s
>>
>> On Thu, Nov 10, 2016 at 1:20 PM, Maycock, Luke <
>> luke.maycock@affiliate.oliverwyman.com> wrote:
>>
>>> Hi Gerard,
>>>
>>>
>>> I see the new status as having a number of uses:
>>>
>>> 1.  A user can manually set a task to skip in a DAG run via the UI.
>>> 2.  We can then make use of this new status to add the following
>>> functionality to Airflow:
>>>    *   Run a DAG run up to a certain point and have the rest of the
>> tasks
>>> have the new status.
>>>    *  Run a DAG run from a certain task to the end, setting all
>>> pre-requisite tasks to have this new status.
>>>
>>> I am happy to be challenged on the above use cases if there are better
>>> ways to achieve the same things.
>>>
>>> Cheers,
>>> Luke Maycock
>>> OLIVER WYMAN
>>> luke.maycock@affiliate.oliverwyman.com<mailto:luke.
>>> maycock@affiliate.oliverwyman.com>
>>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>>
>>>
>>>
>>> ________________________________
>>> From: Gerard Toonstra <gtoonstra@gmail.com>
>>> Sent: 09 November 2016 18:08
>>> To: dev@airflow.incubator.apache.org
>>> Subject: Re: Skip task
>>>
>>> Hey Luke,
>>>
>>> Who or what makes the decision to skip processing that task?
>>>
>>> Rgds,
>>>
>>> Gerard
>>>
>>> On Wed, Nov 9, 2016 at 2:39 PM, Maycock, Luke <
>>> luke.maycock@affiliate.oliverwyman.com> wrote:
>>>
>>>> Hi Gerard,
>>>>
>>>>
>>>> Thank you for your quick response.
>>>>
>>>>
>>>> I am not trying to implement this for a specific operator but rather
>>>> trying to add it as a feature for any task in any DAG.
>>>>
>>>>
>>>> Given that the skipped states propagate where all directly upstream
>> tasks
>>>> are skipped, I don't think this is the state we want to use. For the
>>>> functionality I'm looking for, I think I'll need to introduce a new
>>> status,
>>>> maybe 'disabled'.
>>>>
>>>>
>>>> Again, thanks for your response.
>>>>
>>>>
>>>> Cheers,
>>>> Luke Maycock
>>>> OLIVER WYMAN
>>>> luke.maycock@affiliate.oliverwyman.com<mailto:luke.
>>>> maycock@affiliate.oliverwyman.com>
>>>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>>>
>>>>
>>>>
>>>> ________________________________
>>>> From: Gerard Toonstra <gtoonstra@gmail.com>
>>>> Sent: 08 November 2016 18:19
>>>> To: dev@airflow.incubator.apache.org
>>>> Subject: Re: Skip task
>>>>
>>>> Also in 1.7.1.3, there's the ShortCircuitOperator, which can give you
>> an
>>>> example.
>>>>
>>>> https://github.com/apache/incubator-airflow/blob/1.7.1.
>>>> 3/airflow/operators/python_operator.py
>>>>
>>>> You'd have to modify this to your needs, but the way it works is that
>> if
>>>> the condition evaluates to True, none of the
>>>> downstream tasks are actually executed, they'd be skipped. The reason
>> for
>>>> putting them into SKIPPED state is that
>>>> the DAG final result would still be SUCCESS and not failed.
>>>>
>>>> You could copy the operator from there and don't do the full "for
>> loop",
>>>> only pick the tasks immediately downstream
>>>> from this operator and skip that. Or... if you need to skip additional
>>>> tasks downstream, add a parameter "num_tasks"
>>>> that decide on a halting condition for the for loop.
>>>>
>>>> I believe that should work. I didn't try that here, but you can test
>> that
>>>> and see what it does for you.
>>>>
>>>>
>>>> If you want this as a UI capability... for example have a human
>> operator
>>>> decide on skipping this yes or not, then
>>>> maybe the best way forward would be some kind of highly custom plugin
>>> with
>>>> its own view. In the end, you'd basically
>>>> do the same action in the backend, whether the python cond evaluates to
>>>> True or the button is clicked.
>>>>
>>>> In the plugin case though, you'd have to keep the UI and the structure
>> of
>>>> the DAG in sync and aligned, otherwise
>>>> it'd become a mess.... Airflow wasn't really developed for
>> workflow/human
>>>> interaction, but in workflows where only
>>>> automated processes are involved. That doesn't mean that you can't do
>>>> anything like that, but it may be costly resource
>>>> wise to get this done. For example, on the basis of the BranchOperator,
>>> you
>>>> could call an external API to verify if a decision
>>>> was taken on a case, then follow branch A or B if the decision is there
>>> or
>>>> put the state back into UP_FOR_RETRY.
>>>> At the moment though, there's no programmatic way to reschedule that
>> task
>>>> to some minutes or hours into the future before
>>>> it's looked at again, unless you really dive into airflow, scheduling
>>>> semantics (@once vs. other schedules) and how
>>>> the scheduler works.
>>>>
>>>> Rgds,
>>>>
>>>> Gerard
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Nov 8, 2016 at 5:30 PM, Maycock, Luke <
>>>> luke.maycock@affiliate.oliverwyman.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>>
>>>>> I am using Airflow 1.7.1.3 and have a particular requirement, which I
>>>>> don't think is currently supported by Airflow but just wanted to
>> check
>>> in
>>>>> case I was missing something.
>>>>>
>>>>>
>>>>> I occasionally wish to skip a particular task in a given DAG run such
>>>> that
>>>>> the task does not run for that DAG run. Is this functionality
>> available
>>>> in
>>>>> Airflow?
>>>>>
>>>>>
>>>>> I am aware of the BranchPythonOperator (https://airflow.incubator.
>>>>> apache.org/concepts.html#branching) but I don't think believe this
>> is
>>>>> exactly what I am looking for.
>>>>>
>>>>>
>>>>> I am thinking that a button in the UI alongside the 'Mark Success'
>> and
>>>>> 'Run' buttons would be appropriate.
>>>>>
>>>>>
>>>>> If the functionality does not exist, does anyone have any suggestions
>>> on
>>>>> ways to implement this?
>>>>>
>>>>>
>>>>> Cheers,
>>>>> Luke Maycock
>>>>> OLIVER WYMAN
>>>>> luke.maycock@affiliate.oliverwyman.com<mailto:luke.
>>>>> maycock@affiliate.oliverwyman.com>
>>>>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>>>>
>>>>>
>>>>> ________________________________
>>>>> This e-mail and any attachments may be confidential or legally
>>>> privileged.
>>>>> If you received this message in error or are not the intended
>>> recipient,
>>>>> you should destroy the e-mail message and any attachments or copies,
>>> and
>>>>> you are prohibited from retaining, distributing, disclosing or using
>>> any
>>>>> information contained herein. Please inform us of the erroneous
>>> delivery
>>>> by
>>>>> return e-mail. Thank you for your cooperation.
>>>>>
>>>>
>>>> ________________________________
>>>> This e-mail and any attachments may be confidential or legally
>>> privileged.
>>>> If you received this message in error or are not the intended
>> recipient,
>>>> you should destroy the e-mail message and any attachments or copies,
>> and
>>>> you are prohibited from retaining, distributing, disclosing or using
>> any
>>>> information contained herein. Please inform us of the erroneous
>> delivery
>>> by
>>>> return e-mail. Thank you for your cooperation.
>>>>
>>>
>>> ________________________________
>>> This e-mail and any attachments may be confidential or legally
>> privileged.
>>> If you received this message in error or are not the intended recipient,
>>> you should destroy the e-mail message and any attachments or copies, and
>>> you are prohibited from retaining, distributing, disclosing or using any
>>> information contained herein. Please inform us of the erroneous delivery
>> by
>>> return e-mail. Thank you for your cooperation.
>>>
>>
>> ________________________________
>> This e-mail and any attachments may be confidential or legally privileged.
>> If you received this message in error or are not the intended recipient,
>> you should destroy the e-mail message and any attachments or copies, and
>> you are prohibited from retaining, distributing, disclosing or using any
>> information contained herein. Please inform us of the erroneous delivery by
>> return e-mail. Thank you for your cooperation.
>>
>
> ________________________________
> This e-mail and any attachments may be confidential or legally privileged. If you received
this message in error or are not the intended recipient, you should destroy the e-mail message
and any attachments or copies, and you are prohibited from retaining, distributing, disclosing
or using any information contained herein. Please inform us of the erroneous delivery by return
e-mail. Thank you for your cooperation.
>
> ________________________________
> This e-mail and any attachments may be confidential or legally privileged. If you received
this message in error or are not the intended recipient, you should destroy the e-mail message
and any attachments or copies, and you are prohibited from retaining, distributing, disclosing
or using any information contained herein. Please inform us of the erroneous delivery by return
e-mail. Thank you for your cooperation.

________________________________
This e-mail and any attachments may be confidential or legally privileged. If you received
this message in error or are not the intended recipient, you should destroy the e-mail message
and any attachments or copies, and you are prohibited from retaining, distributing, disclosing
or using any information contained herein. Please inform us of the erroneous delivery by return
e-mail. Thank you for your cooperation.
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message