airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bolke de Bruin <bdbr...@gmail.com>
Subject Re: Skip task
Date Mon, 12 Dec 2016 10:40:01 GMT
Have a look at: https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
<https://github.com/apache/incubator-airflow/blob/master/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py>

Make sure to include "type_=mysql.DATETIME(fsp=6)” for your DateTime types on MySQL.

- Bolke



> Op 12 dec. 2016, om 11:33 heeft Maycock, Luke <luke.maycock@affiliate.oliverwyman.com>
het volgende geschreven:
> 
> It is a new table named 'TaskExclusion'. The migration script for this is as follows:
> 
> def upgrade():
>    op.create_table(
>        'task_exclusion',
>        sa.Column('id', sa.Integer(), nullable=False),
>        sa.Column('dag_id', sa.String(length=250), nullable=False),
>        sa.Column('task_id', sa.String(length=250), nullable=False),
>        sa.Column('exclusion_type', sa.String(length=32), nullable=False),
>        sa.Column('exclusion_start_date', sa.DateTime(), nullable=False),
>        sa.Column('exclusion_end_date', sa.DateTime(), nullable=False),
>        sa.Column('created_by', sa.String(length=256), nullable=False),
>        sa.Column('created_on', sa.DateTime(), nullable=False),
>        sa.PrimaryKeyConstraint('id'))
> 
> 
> def downgrade():
>    op.drop_table('task_exclusion')
> 
> This is the PR for the exclusion of a task. We review our code internally before setting
up a PR into the main repo for the next review, hence the PR being in our fork. The PR does
not yet contain our unit tests.
> 
> 
> Cheers,
> Luke Maycock
> OLIVER WYMAN
> luke.maycock@affiliate.oliverwyman.com<mailto:luke.maycock@affiliate.oliverwyman.com>
> www.oliverwyman.com<http://www.oliverwyman.com/>
> 
> 
> 
> ________________________________
> From: Bolke de Bruin <bdbruin@gmail.com>
> Sent: 09 December 2016 20:54
> To: dev@airflow.incubator.apache.org
> Subject: Re: Skip task
> 
> What table was this? I recently pushed a fix that allows fractional seconds in our minimum
supported version of MySQL (5.6.4 and beyond).
> 
> I might have missed something.
> 
> Thanks
> Bolke
> 
> Sent from my iPhone
> 
>> On 9 Dec 2016, at 14:27, Maycock, Luke <luke.maycock@affiliate.oliverwyman.com>
wrote:
>> 
>> I found the issue to be that, for MySQL, the datetime was being rounded to the nearest
second. The strange thing is that if a datetime without the microseconds was passed to SQLAlchemy,
the insertion into MySQL failed; but when a datetime with microseconds was passed, the microseconds
are removed by rounding to the nearest second.
>> 
>> 
>> Hopefully, this will prevent someone else going down the same rabbit hole that I
did.
>> 
>> 
>> Cheers,
>> Luke Maycock
>> OLIVER WYMAN
>> luke.maycock@affiliate.oliverwyman.com<mailto:luke.maycock@affiliate.oliverwyman.com>
>> www.oliverwyman.com<http://www.oliverwyman.com/>
>> 
>> 
>> ________________________________
>> From: Maycock, Luke <luke.maycock@affiliate.oliverwyman.com>
>> Sent: 08 December 2016 10:44:32
>> To: dev@airflow.incubator.apache.org
>> Subject: Re: Skip task
>> 
>> Hi All,
>> 
>> 
>> We have implemented a solution for allowing the exclusion of individual tasks during
a DAG run. However, when writing unit tests for this, we are encountering an issue with MySQL,
which I am hoping someone is able to help us with.
>> 
>> 
>> For our solution, we have a new 'TaskExclusion' table in the meta-data. Our unit
tests were run by Travis, not locally.
>> 
>> 
>> The code block under test:
>> 
>> 
>> class TaskExclusion(Base):
>>  """
>> This class is used to define objects that can be used to specify not to
>> run a given task in a given dag on a variety of execution date conditions.
>> These objects will be stored in the backend database in the task_exclusion
>> table.
>> Static methods are provided for the creation, removal and investigation of
>> these objects.
>> """
>> 
>> __tablename__ = "task_exclusion"
>> 
>> id = Column(Integer(), primary_key=True)
>>  dag_id = Column(String(ID_LEN), nullable=False)
>>  task_id = Column(String(ID_LEN), nullable=False)
>>  exclusion_type = Column(String(32), nullable=False)
>>  exclusion_start_date = Column(DateTime, nullable=True)
>>  exclusion_end_date = Column(DateTime, nullable=True)
>>  created_by = Column(String(256), nullable=False)
>>  created_on = Column(DateTime, nullable=False)
>> 
>>  @classmethod
>> @provide_session
>> def set(
>>          cls,
>>          dag_id,
>>          task_id,
>>          exclusion_type,
>>          exclusion_start_date,
>>          exclusion_end_date,
>>          created_by,
>>          session=None):
>>      """
>> Add a task exclusion to prevent a task running under certain
>> circumstances.
>> :param dag_id: The dag_id of the DAG containing the task to exclude
>> from execution.
>> :param task_id: The task_id of the task to exclude from execution.
>> :param exclusion_type: The type of circumstances to exclude the task
>> from execution under. See the TaskExclusionType class for more detail.
>> :param exclusion_start_date: The execution_date to start excluding on.
>> This will be ignored if the exclusion_type is INDEFINITE.
>> :param exclusion_end_date: The execution_date to stop excluding on.
>> This will be ignored if the exclusion_type is INDEFINITE or
>> SINGLE_DATE.
>> :param created_by: Who is creating this exclusion. Stored with the
>> exclusion record for auditing/debugging purposes.
>> :return: None.
>> """
>> 
>> session.expunge_all()
>> 
>>      # Set up execution date range correctly
>> if exclusion_type == TaskExclusionType.SINGLE_DATE:
>>          if exclusion_start_date:
>>              exclusion_end_date = exclusion_start_date
>>          else:
>>              raise AirflowException(
>>                  "No exclusion_start_date "
>> )
>>      elif exclusion_type == TaskExclusionType.DATE_RANGE:
>>          if exclusion_start_date > exclusion_end_date:
>>              raise AirflowException(
>>                  "The exclusion_start_date is after the exclusion_end_date"
>> )
>>      elif exclusion_type == TaskExclusionType.INDEFINITE:
>>          exclusion_start_date = None
>> exclusion_end_date = None
>> else:
>>          raise AirflowException(
>>              "The exclusion_type, {}, is not recognised."
>> .format(exclusion_type)
>>          )
>> 
>>      # remove any duplicate exclusions
>> session.query(cls).filter(
>>          cls.dag_id == dag_id,
>>          cls.task_id == task_id,
>>          cls.exclusion_type == exclusion_type,
>>          cls.exclusion_start_date == exclusion_start_date,
>>          cls.exclusion_end_date == exclusion_end_date
>>      ).delete()
>> 
>>      # insert new exclusion
>> session.add(TaskExclusion(
>>          dag_id=dag_id,
>>          task_id=task_id,
>>          exclusion_type=exclusion_type,
>>          exclusion_start_date=exclusion_start_date,
>>          exclusion_end_date=exclusion_end_date,
>>          created_by=created_by,
>>          created_on=datetime.now())
>>      )
>> 
>>      session.commit()
>> 
>> 
>> The unit test:
>> 
>> class TaskExclusionTest(unittest.TestCase):
>>  def test_set_exclusion(self, session=None):
>> 
>>      session = settings.Session()
>> 
>>      session.expunge_all()
>> 
>>      dag_id = 'test_task_exclude'
>> task_id = 'test_task_exclude'
>> exec_date = datetime.datetime.now()
>> 
>>      TaskExclusion.set(dag_id=dag_id,
>>                        task_id=task_id,
>>                        exclusion_type=TaskExclusionType.SINGLE_DATE,
>>                        exclusion_start_date=exec_date,
>>                        exclusion_end_date=exec_date,
>>                        created_by='airflow')
>> 
>> 
>>      exclusion = session.query(TaskExclusion).filter(
>>                      TaskExclusion.dag_id == dag_id,
>>                      TaskExclusion.task_id == task_id,
>>                      TaskExclusion.exclusion_type == TaskExclusionType.SINGLE_DATE,
>>                      TaskExclusion.exclusion_start_date == exec_date,
>>                      TaskExclusion.exclusion_end_date == exec_date).first()
>> 
>>      self.assertTrue(exclusion)
>> 
>> 
>> The unit test passes for postgreSQL and SQLite but fails for MySQL. I have checked
and the 'exclusion' variable contains a TaskExclusion object for postgreSQL and SQLite but
is set to 'None' for MySQL. Any suggestions on what could be causing this would be much appreciated.
>> 
>> 
>> Cheers,
>> Luke Maycock
>> OLIVER WYMAN
>> luke.maycock@affiliate.oliverwyman.com<mailto:luke.maycock@affiliate.oliverwyman.com>
>> www.oliverwyman.com<http://www.oliverwyman.com/>
>> 
>> 
>> 
>> ________________________________
>> From: siddharth anand <sanand@apache.org>
>> Sent: 16 November 2016 00:40
>> To: dev@airflow.incubator.apache.org
>> Subject: Re: Skip task
>> 
>> If your requirement is to skip a portion of tasks in a DagRun based on some
>> state encountered while executing that DagRun, that is what
>> BranchPythonOperator or ShortCircruitOperator (optionally paired with a
>> Trigger Rule specified on a downstream task) is made for.
>> 
>> These operators take a custom Python callable as a argument. The callable
>> can check for the existence of data or files that should have been
>> generated by an external system or an upstream task in the same DAG. The
>> callables need to return a Boolean value in the case of the
>> ShortCircruitOperator or a selected choice (i.e. branch to take) as in the
>> case of the BranchPythonOperator.
>> 
>> If you have 20 tasks that all depend on the presence of 20 different files,
>> you would need 20 ShortCircruitOperator or BranchPythonOperator tasks each
>> either sharing a common callable or each with its own callable.
>> 
>> One could argue that these tasks are "overhead" because they just encompass
>> some conditional or control logic and that DAGs should only contain
>> workhorse tasks (i.e. tasks that do some  work). DAGs with workhorse-only
>> tasks are more of a pure dataflow approach -- i.e. no control-logic
>> operators. However, I don't see another option.
>> 
>> In the current system, a callable registered with a ShortCircruitOperator
>> would check for the presence of a file -- if the file were not available,
>> then a series of downstream tasks would be skipped in that DAGRun, until a
>> task with a Trigger_Rule="all_done" were encountered, downstream of which,
>> tasks would no longer be skipped for the DagRun.
>> 
>> I hope this makes sense.
>> 
>> A long time ago, I proposed UI functionality to skip a series of DAG runs
>> via the UI, because I knew that no data was available for that time range
>> from an external system. It wanted to essentially specify a "blackout"
>> period in terms of a time range that covered multiple DagRuns. My intention
>> was for backfills to skip those days. It turns out that my company did not
>> end up having such a requirement, so I dropped the feature request.
>> 
>> If this is what you are asking for, then I am +1. Please implement it and
>> submit a PR.
>> 
>> On Tue, Nov 15, 2016 at 2:50 AM, Maycock, Luke <
>> luke.maycock@affiliate.oliverwyman.com> wrote:
>> 
>>> Thank you for taking the time to respond. This is a great approach if you
>>> know at the time of creating the DAG which tasks you expect to need to
>>> skip. However, I don't think this is exactly the use case I have. For
>>> example, I may be expecting a file to arrive in an FTP folder for loading
>>> into a database but one day it doesn't arrive so I just want to skip that
>>> task on that day.
>>> 
>>> 
>>> Our workflows commonly have around 20 of these types of tasks in. I could
>>> configure all of these tasks in the way you suggested in case I ever need
>>> to skip one of them. However, I'd prefer not to have to set the tasks up
>>> this way and instead have the ability just to skip a task on an ad-hoc
>>> basis. I could then also use this functionality to add the ability to run
>>> from a certain point in a DAG or to a certain point in the DAG.
>>> 
>>> 
>>> 
>>> Thanks,
>>> Luke Maycock
>>> OLIVER WYMAN
>>> luke.maycock@affiliate.oliverwyman.com<mailto:luke.
>>> maycock@affiliate.oliverwyman.com>
>>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>> 
>>> 
>>> 
>>> ________________________________
>>> From: siddharth anand <sanand@apache.org>
>>> Sent: 14 November 2016 19:48
>>> To: dev@airflow.incubator.apache.org
>>> Subject: Re: Skip task
>>> 
>>> For cases like this, we (Agari) use the following approach :
>>> 
>>> 1. Create a Variable in the UI of type boolean such as *enable_feature_x*
>>> 2. Use a ShortCircuitOperator (or BranchPythonOperator) to Skip
>>> downstream processing based on the value of *enable_feature_x*
>>> 3. Assuming that you don't want to skip ALL downstream tasks, you can
>>> use a trigger_rule of all_done to resume processing some portion of your
>>> downstream DAG after skipping an upstream portion
>>> 
>>> In other words, there is already a means to achieve what you are asking for
>>> today. You can change the value of via *enable_feature_x  *the UI. If you'd
>>> like to enhance the UI to better capture this pattern, pls submit a PR.
>>> -s
>>> 
>>> On Thu, Nov 10, 2016 at 1:20 PM, Maycock, Luke <
>>> luke.maycock@affiliate.oliverwyman.com> wrote:
>>> 
>>>> Hi Gerard,
>>>> 
>>>> 
>>>> I see the new status as having a number of uses:
>>>> 
>>>> 1.  A user can manually set a task to skip in a DAG run via the UI.
>>>> 2.  We can then make use of this new status to add the following
>>>> functionality to Airflow:
>>>>   *   Run a DAG run up to a certain point and have the rest of the
>>> tasks
>>>> have the new status.
>>>>   *  Run a DAG run from a certain task to the end, setting all
>>>> pre-requisite tasks to have this new status.
>>>> 
>>>> I am happy to be challenged on the above use cases if there are better
>>>> ways to achieve the same things.
>>>> 
>>>> Cheers,
>>>> Luke Maycock
>>>> OLIVER WYMAN
>>>> luke.maycock@affiliate.oliverwyman.com<mailto:luke.
>>>> maycock@affiliate.oliverwyman.com>
>>>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>>> 
>>>> 
>>>> 
>>>> ________________________________
>>>> From: Gerard Toonstra <gtoonstra@gmail.com>
>>>> Sent: 09 November 2016 18:08
>>>> To: dev@airflow.incubator.apache.org
>>>> Subject: Re: Skip task
>>>> 
>>>> Hey Luke,
>>>> 
>>>> Who or what makes the decision to skip processing that task?
>>>> 
>>>> Rgds,
>>>> 
>>>> Gerard
>>>> 
>>>> On Wed, Nov 9, 2016 at 2:39 PM, Maycock, Luke <
>>>> luke.maycock@affiliate.oliverwyman.com> wrote:
>>>> 
>>>>> Hi Gerard,
>>>>> 
>>>>> 
>>>>> Thank you for your quick response.
>>>>> 
>>>>> 
>>>>> I am not trying to implement this for a specific operator but rather
>>>>> trying to add it as a feature for any task in any DAG.
>>>>> 
>>>>> 
>>>>> Given that the skipped states propagate where all directly upstream
>>> tasks
>>>>> are skipped, I don't think this is the state we want to use. For the
>>>>> functionality I'm looking for, I think I'll need to introduce a new
>>>> status,
>>>>> maybe 'disabled'.
>>>>> 
>>>>> 
>>>>> Again, thanks for your response.
>>>>> 
>>>>> 
>>>>> Cheers,
>>>>> Luke Maycock
>>>>> OLIVER WYMAN
>>>>> luke.maycock@affiliate.oliverwyman.com<mailto:luke.
>>>>> maycock@affiliate.oliverwyman.com>
>>>>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>>>> 
>>>>> 
>>>>> 
>>>>> ________________________________
>>>>> From: Gerard Toonstra <gtoonstra@gmail.com>
>>>>> Sent: 08 November 2016 18:19
>>>>> To: dev@airflow.incubator.apache.org
>>>>> Subject: Re: Skip task
>>>>> 
>>>>> Also in 1.7.1.3, there's the ShortCircuitOperator, which can give you
>>> an
>>>>> example.
>>>>> 
>>>>> https://github.com/apache/incubator-airflow/blob/1.7.1.
>>>>> 3/airflow/operators/python_operator.py
>>>>> 
>>>>> You'd have to modify this to your needs, but the way it works is that
>>> if
>>>>> the condition evaluates to True, none of the
>>>>> downstream tasks are actually executed, they'd be skipped. The reason
>>> for
>>>>> putting them into SKIPPED state is that
>>>>> the DAG final result would still be SUCCESS and not failed.
>>>>> 
>>>>> You could copy the operator from there and don't do the full "for
>>> loop",
>>>>> only pick the tasks immediately downstream
>>>>> from this operator and skip that. Or... if you need to skip additional
>>>>> tasks downstream, add a parameter "num_tasks"
>>>>> that decide on a halting condition for the for loop.
>>>>> 
>>>>> I believe that should work. I didn't try that here, but you can test
>>> that
>>>>> and see what it does for you.
>>>>> 
>>>>> 
>>>>> If you want this as a UI capability... for example have a human
>>> operator
>>>>> decide on skipping this yes or not, then
>>>>> maybe the best way forward would be some kind of highly custom plugin
>>>> with
>>>>> its own view. In the end, you'd basically
>>>>> do the same action in the backend, whether the python cond evaluates
to
>>>>> True or the button is clicked.
>>>>> 
>>>>> In the plugin case though, you'd have to keep the UI and the structure
>>> of
>>>>> the DAG in sync and aligned, otherwise
>>>>> it'd become a mess.... Airflow wasn't really developed for
>>> workflow/human
>>>>> interaction, but in workflows where only
>>>>> automated processes are involved. That doesn't mean that you can't do
>>>>> anything like that, but it may be costly resource
>>>>> wise to get this done. For example, on the basis of the BranchOperator,
>>>> you
>>>>> could call an external API to verify if a decision
>>>>> was taken on a case, then follow branch A or B if the decision is there
>>>> or
>>>>> put the state back into UP_FOR_RETRY.
>>>>> At the moment though, there's no programmatic way to reschedule that
>>> task
>>>>> to some minutes or hours into the future before
>>>>> it's looked at again, unless you really dive into airflow, scheduling
>>>>> semantics (@once vs. other schedules) and how
>>>>> the scheduler works.
>>>>> 
>>>>> Rgds,
>>>>> 
>>>>> Gerard
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Nov 8, 2016 at 5:30 PM, Maycock, Luke <
>>>>> luke.maycock@affiliate.oliverwyman.com> wrote:
>>>>> 
>>>>>> Hi All,
>>>>>> 
>>>>>> 
>>>>>> I am using Airflow 1.7.1.3 and have a particular requirement, which
I
>>>>>> don't think is currently supported by Airflow but just wanted to
>>> check
>>>> in
>>>>>> case I was missing something.
>>>>>> 
>>>>>> 
>>>>>> I occasionally wish to skip a particular task in a given DAG run
such
>>>>> that
>>>>>> the task does not run for that DAG run. Is this functionality
>>> available
>>>>> in
>>>>>> Airflow?
>>>>>> 
>>>>>> 
>>>>>> I am aware of the BranchPythonOperator (https://airflow.incubator.
>>>>>> apache.org/concepts.html#branching) but I don't think believe this
>>> is
>>>>>> exactly what I am looking for.
>>>>>> 
>>>>>> 
>>>>>> I am thinking that a button in the UI alongside the 'Mark Success'
>>> and
>>>>>> 'Run' buttons would be appropriate.
>>>>>> 
>>>>>> 
>>>>>> If the functionality does not exist, does anyone have any suggestions
>>>> on
>>>>>> ways to implement this?
>>>>>> 
>>>>>> 
>>>>>> Cheers,
>>>>>> Luke Maycock
>>>>>> OLIVER WYMAN
>>>>>> luke.maycock@affiliate.oliverwyman.com<mailto:luke.
>>>>>> maycock@affiliate.oliverwyman.com>
>>>>>> www.oliverwyman.com<http://www.oliverwyman.com/>
>>>>>> 
>>>>>> 
>>>>>> ________________________________
>>>>>> This e-mail and any attachments may be confidential or legally
>>>>> privileged.
>>>>>> If you received this message in error or are not the intended
>>>> recipient,
>>>>>> you should destroy the e-mail message and any attachments or copies,
>>>> and
>>>>>> you are prohibited from retaining, distributing, disclosing or using
>>>> any
>>>>>> information contained herein. Please inform us of the erroneous
>>>> delivery
>>>>> by
>>>>>> return e-mail. Thank you for your cooperation.
>>>>>> 
>>>>> 
>>>>> ________________________________
>>>>> This e-mail and any attachments may be confidential or legally
>>>> privileged.
>>>>> If you received this message in error or are not the intended
>>> recipient,
>>>>> you should destroy the e-mail message and any attachments or copies,
>>> and
>>>>> you are prohibited from retaining, distributing, disclosing or using
>>> any
>>>>> information contained herein. Please inform us of the erroneous
>>> delivery
>>>> by
>>>>> return e-mail. Thank you for your cooperation.
>>>>> 
>>>> 
>>>> ________________________________
>>>> This e-mail and any attachments may be confidential or legally
>>> privileged.
>>>> If you received this message in error or are not the intended recipient,
>>>> you should destroy the e-mail message and any attachments or copies, and
>>>> you are prohibited from retaining, distributing, disclosing or using any
>>>> information contained herein. Please inform us of the erroneous delivery
>>> by
>>>> return e-mail. Thank you for your cooperation.
>>>> 
>>> 
>>> ________________________________
>>> This e-mail and any attachments may be confidential or legally privileged.
>>> If you received this message in error or are not the intended recipient,
>>> you should destroy the e-mail message and any attachments or copies, and
>>> you are prohibited from retaining, distributing, disclosing or using any
>>> information contained herein. Please inform us of the erroneous delivery by
>>> return e-mail. Thank you for your cooperation.
>>> 
>> 
>> ________________________________
>> This e-mail and any attachments may be confidential or legally privileged. If you
received this message in error or are not the intended recipient, you should destroy the e-mail
message and any attachments or copies, and you are prohibited from retaining, distributing,
disclosing or using any information contained herein. Please inform us of the erroneous delivery
by return e-mail. Thank you for your cooperation.
>> 
>> ________________________________
>> This e-mail and any attachments may be confidential or legally privileged. If you
received this message in error or are not the intended recipient, you should destroy the e-mail
message and any attachments or copies, and you are prohibited from retaining, distributing,
disclosing or using any information contained herein. Please inform us of the erroneous delivery
by return e-mail. Thank you for your cooperation.
> 
> ________________________________
> This e-mail and any attachments may be confidential or legally privileged. If you received
this message in error or are not the intended recipient, you should destroy the e-mail message
and any attachments or copies, and you are prohibited from retaining, distributing, disclosing
or using any information contained herein. Please inform us of the erroneous delivery by return
e-mail. Thank you for your cooperation.


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message