airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ruslan Dautkhanov <dautkha...@gmail.com>
Subject Re: regression from a month-old aitflow version
Date Sun, 07 May 2017 17:39:31 GMT
Filed https://issues.apache.org/jira/browse/AIRFLOW-1178 for @once being
scheduled twice.




-- 
Ruslan Dautkhanov

On Sat, May 6, 2017 at 9:30 PM, Ruslan Dautkhanov <dautkhanov@gmail.com>
wrote:

> Thanks for the follow up Chris.
> It used to work for me with catchup=False in a month-old version of
> Airflow. That's why I mentioned it as a regression.
>
> Tried today catchup=True with @once seems actually tries to "catchup"
> which does not make sense for @once schedule,
> notice there is one active run and one pending/"scheduled":
>        [image: Inline image 1]
>
> So we can't really use @once with catchup=True and it's not a workaround
> for this problem.
>
> Thanks.
>
>
>
> --
> Ruslan Dautkhanov
>
> On Sat, May 6, 2017 at 10:47 AM, Chris Fei <cfei18@gmail.com> wrote:
>
>> I wonder if your issue is the same root cause as AIRFLOW-1013[1] (which
>> you seem to have reported) and AIRFLOW-1055[2]. I haven't tried it
>> myself, but that second ticket seems to indicate that a workaround
>> could be setting catchup = True on your DAG. Not sure if that's an
>> option for you.
>> On Sat, May 6, 2017, at 12:29 PM, Ruslan Dautkhanov wrote:
>> > I've upgraded Airflow to today's master branch.
>> >
>> > Got following regression in attempt to start a DAG:
>> >
>> > Process DagFileProcessor209-Process:
>> >> Traceback (most recent call last):
>> >> File
>> >> "/opt/cloudera/parcels/Anaconda/lib/python2.7/multiprocessing/proce-
>> >> ss.py",>> line 258, in _bootstrap
>> >>   self.run()
>> >> File
>> >> "/opt/cloudera/parcels/Anaconda/lib/python2.7/multiprocessing/proce-
>> >> ss.py",>> line 114, in run
>> >>   self._target(*self._args, **self._kwargs)
>> >> File "/opt/airflow/airflow-
>> >> 20170506/src/airflow/airflow/jobs.py", line>> 346, in helper
>> >>   pickle_dags)
>> >> File "/opt/airflow/airflow-20170506/src/airflow/airflow/utils/db.py",>>
>> line 48, in wrapper
>> >>   result = func(*args, **kwargs)
>> >> File "/opt/airflow/airflow-
>> >> 20170506/src/airflow/airflow/jobs.py", line>> 1584, in process_file
>> >>   self._process_dags(dagbag, dags, ti_keys_to_schedule)
>> >> File "/opt/airflow/airflow-
>> >> 20170506/src/airflow/airflow/jobs.py", line>> 1173, in _process_dags
>> >>   dag_run = self.create_dag_run(dag)
>> >> File "/opt/airflow/airflow-20170506/src/airflow/airflow/utils/db.py",>>
>> line 48, in wrapper
>> >>   result = func(*args, **kwargs)
>> >> File "/opt/airflow/airflow-
>> >> 20170506/src/airflow/airflow/jobs.py", line>> 776, in create_dag_run
>> >>   if next_start <= now:
>> >> TypeError: can't compare datetime.datetime to NoneType
>> >
>> >
>> >
>> > DAG definition:
>> >
>> > main_dag = DAG(
>> >>   dag_id                         = 'DISCOVER-Oracle-Load-Mar2017-v1',>>
>>  default_args                   = default_args,                  #
>> >>   dafeult operators' arguments - see above>>   user_defined_macros
>>         = dag_macros,       # I do not get
>> >>   different between>>   ## params                         =
>> dag_macros,       #
>> >>   ## user_defined_macros and params>>   #
>> >>   start_date                     = datetime.now(),                #
>> >>   or e.g. datetime(2015, 6, 1)>>   # 'end_date'                  
=
>> datetime(2016, 1, 1),
>> >>   catchup                        = False,                         #
>> >>   Perform scheduler catchup (or only run latest)?>>
>>                                                    # -
>> defaults to True>>   schedule_interval              = '@once',
>>            #
>> >>   '@once'=None?>>
>>                #
>> doesn't create multiple dag runs automatically>>   concurrency
>>         = 3,                             #
>> >>   task instances allowed to run concurrently>>   max_active_runs
>>           = 1,                             #
>> >>   only one DAG run at a time>>   dagrun_timeout                 =
>> timedelta(days=4),             #
>> >>   no way this dag should ran for 4 days>>   orientation
>>     = 'TB',                          #
>> >>   default graph view>> )
>> >
>> >
>> > default_args:
>> >
>> > default_args = {
>> >>   # Security:
>> >>   'owner'                        : 'rdautkha',                    #
>> >>   owner of the task, using the unix username is recommended>>   #
>> 'run_as_user'                : None                           #
>> >>   # unix username to impersonate while running the task>>   #
>> Scheduling:
>> >>   'start_date'                   : None,                          #
>> >>   don't confuse with DAG's start_date>>   'depends_on_past'
>>   : False,                         #
>> >>   True makes sense... but there are bugs around that code>>
>>  'wait_for_downstream'          : False,                         #
>> >>   depends_on_past is forced to True if wait_for_downstream>>
>>  'trigger_rule'                 : 'all_success',                 #
>> >>   all_succcess is default anyway>>   # Retries
>> >>   'retries'                      : 0,                             #
>> >>   No retries>>   # 'retry_delay'                :
>> timedelta(minutes=5),          #
>> >>   # check retry_exponential_backoff and max_retry_delay too>>   #
>> Timeouts and SLAs
>> >>   # 'sla'                        : timedelta(hours=1),            #
>> >>   # default tasks' sla - normally don't run longer>>
>>  'execution_timeout'            : timedelta(hours=3),            #
>> >>   no single task runs 3 hours or more>>   # 'sla_miss_callback'
>>                                    # -
>> >>   # function to call when reporting SLA timeouts>>   # Notifications:
>> >>   'email'                        : ['rdautkhanov@epsilon.com'],
>> >>   'email_on_failure'             : True,
>> >>   'email_on_retry'               : True,
>> >>   # Resource usage:
>> >>   'pool'                         : 'DISCOVER-Prod',               #
>> >>   can increase this pool's concurrency>>   # 'queue'
>>     : 'some_queue',
>> >>   # 'priority_weight'            : 10,
>> >>   # Miscellaneous:
>> >>   # on_failure_callback=None, on_success_callback=None,
>> >>   # on_retry_callback=None>> }
>> >
>> >
>> > The DAG itself has a bunch of Oracle operators.
>> >
>> > Any ideas?
>> >
>> > That's a regression from a month old Airflow.
>> > No changes in DAG.
>> >
>> >
>> >
>> > Thank you,
>> > Ruslan Dautkhanov
>>
>>
>> Links:
>>
>>   1. https://issues.apache.org/jira/browse/AIRFLOW-1013
>>   2. https://issues.apache.org/jira/browse/AIRFLOW-1055
>>
>
>

Mime
  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message