airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Russell Jurney <russell.jur...@gmail.com>
Subject Re: Simple Airflow BashOperators run but can't be scheduled or un-paused
Date Thu, 18 May 2017 19:23:44 GMT
Our engineer Josh Watts has created a patch for 1.8.1 that adds UTC time
zones to the datetime.now()/datetime()/SQLAlchemy/Alembic datetime stuff.
He'll submit a pull request here shortly.


Russell Jurney @rjurney <http://twitter.com/rjurney>
russell.jurney@gmail.com LI <http://linkedin.com/in/russelljurney> FB
<http://facebook.com/jurney> datasyndrome.com

On Wed, May 17, 2017 at 10:34 AM, Russell Jurney <russell.jurney@gmail.com>
wrote:

> I am running UTC, synchronized to Amazon time via NTP. At some point in
> the 3.x line, Python's datetime started returning a timezone set. This is
> breaking things.
>
> Bolke: Can you please direct me to that patch? I tried doing this myself,
> but it is complicated and time consuming to get it to work.
>
> Russell Jurney @rjurney <http://twitter.com/rjurney>
> russell.jurney@gmail.com LI <http://linkedin.com/in/russelljurney> FB
> <http://facebook.com/jurney> datasyndrome.com
>
> On Wed, May 17, 2017 at 12:01 AM, Bolke de Bruin <bdbruin@gmail.com>
> wrote:
>
>> We probably need to use .utcnow() at most places. There was a patch for
>> that, but we/I hold it off due to operational implications that it might
>> bring and it is kind of hard to test.
>>
>> B.
>>
>> > On 17 May 2017, at 02:20, Russell Jurney <russell.jurney@gmail.com>
>> wrote:
>> >
>> > It seems that this entire file needs to be patched so that the
>> > datetime.now() calls use tz=pytz.utc. At some point Python started
>> > including timezones in datetime.now() and so this is broken.
>> >
>> > I think I can patch it. But one problem I am having is how do I see the
>> log
>> > messages of the scheduler when  I use for example:
>> > self.logger.error(job.latest_heartbeat)
>> > ?
>> >
>> > Russell Jurney @rjurney <http://twitter.com/rjurney>
>> > russell.jurney@gmail.com LI <http://linkedin.com/in/russelljurney> FB
>> > <http://facebook.com/jurney> datasyndrome.com
>> >
>> > On Tue, May 16, 2017 at 3:40 PM, Russell Jurney <
>> russell.jurney@gmail.com>
>> > wrote:
>> >
>> >> I setup conda to run python 3.4 and I still get this when a job is
>> >> scheduled and runs:
>> >>
>> >> [2017-05-16 15:37:54,339] {jobs.py:1171} DagFileProcessor1 INFO -
>> >> Processing agl_p2p_api_worker_dag
>> >> [2017-05-16 15:37:55,601] {jobs.py:354} DagFileProcessor1 ERROR - Got
>> an
>> >> exception! Propagating...
>> >> Traceback (most recent call last):
>> >>  File "/Users/rjurney/Software/incubator-airflow/airflow/jobs.py",
>> line
>> >> 346, in helper
>> >>    pickle_dags)
>> >>  File "/Users/rjurney/Software/incubator-airflow/airflow/utils/db.py",
>> >> line 48, in wrapper
>> >>    result = func(*args, **kwargs)
>> >>  File "/Users/rjurney/Software/incubator-airflow/airflow/jobs.py",
>> line
>> >> 1584, in process_file
>> >>    self._process_dags(dagbag, dags, ti_keys_to_schedule)
>> >>  File "/Users/rjurney/Software/incubator-airflow/airflow/jobs.py",
>> line
>> >> 1173, in _process_dags
>> >>    dag_run = self.create_dag_run(dag)
>> >>  File "/Users/rjurney/Software/incubator-airflow/airflow/utils/db.py",
>> >> line 48, in wrapper
>> >>    result = func(*args, **kwargs)
>> >>  File "/Users/rjurney/Software/incubator-airflow/airflow/jobs.py",
>> line
>> >> 815, in create_dag_run
>> >>    if next_run_date > datetime.now():
>> >> TypeError: can't compare offset-naive and offset-aware datetimes
>> >>
>> >>
>> >> What do I do?
>> >>
>> >> Russell Jurney @rjurney <http://twitter.com/rjurney>
>> >> russell.jurney@gmail.com LI <http://linkedin.com/in/russelljurney>
FB
>> >> <http://facebook.com/jurney> datasyndrome.com
>> >>
>> >> On Tue, May 16, 2017 at 2:18 PM, Russell Jurney <
>> russell.jurney@gmail.com>
>> >> wrote:
>> >>
>> >>> Thanks, we're trying that now!
>> >>>
>> >>> Russell Jurney @rjurney <http://twitter.com/rjurney>
>> >>> russell.jurney@gmail.com LI <http://linkedin.com/in/russelljurney>
FB
>> >>> <http://facebook.com/jurney> datasyndrome.com
>> >>>
>> >>> On Tue, May 16, 2017 at 2:02 PM, Bolke de Bruin <bdbruin@gmail.com>
>> >>> wrote:
>> >>>
>> >>>> Did you try to run this on Py 2.7 / 3.4 as well? I notice you are
>> >>>> running on 3.6, which we are not testing against at the moment.
>> >>>>
>> >>>> Bolke.
>> >>>>
>> >>>>> On 16 May 2017, at 22:46, Russell Jurney <russell.jurney@gmail.com>
>> >>>> wrote:
>> >>>>>
>> >>>>> We have tasks that run, but we can't get them to run as scheduled
>> or to
>> >>>>> un-pause.
>> >>>>>
>> >>>>> The code for the task looks like this:
>> >>>>>
>> >>>>> # Run the API worker every 5 minutes
>> >>>>> api_worker_dag = DAG(
>> >>>>>   'agl_p2p_api_worker_dag',
>> >>>>>   default_args=default_args,
>> >>>>>   schedule_interval=timedelta(minutes=5)
>> >>>>> )
>> >>>>>
>> >>>>> # Run the API worker[
>> >>>>> api_worker_task = BashOperator(
>> >>>>>   task_id="api_worker_task",
>> >>>>>   bash_command="""python {{ params.base_path
>> >>>>> }}/agl-p2p-api-worker/site/worker.py {{ ds }}""",
>> >>>>>   params={
>> >>>>>       "base_path": project_home
>> >>>>>   },
>> >>>>>   dag=api_worker_dag
>> >>>>> )
>> >>>>>
>> >>>>> We run this command: airflow unpause agl_p2p_api_worker_dag
>> >>>>>
>> >>>>> And we see this error:
>> >>>>>
>> >>>>> [2017-05-16 20:26:48,722] {jobs.py:1408} INFO - Heartbeating
the
>> >>>> process
>> >>>>> manager
>> >>>>> [2017-05-16 20:26:48,723] {dag_processing.py:559} INFO - Processor
>> for
>> >>>>> /root/airflow/dags/setup.py finished
>> >>>>> [2017-05-16 20:26:48,723] {dag_processing.py:578} WARNING -
>> Processor
>> >>>> for
>> >>>>> /root/airflow/dags/setup.py exited with return code 1. See
>> >>>>> /root/airflow/logs/scheduler/2017-05-16/setup.py.log for details.
>> >>>>> [2017-05-16 20:26:48,726] {dag_processing.py:627} INFO - Started
a
>> >>>> process
>> >>>>> (PID: 110) to generate tasks for /root/airflow/dags/setup.py
-
>> logging
>> >>>> into
>> >>>>> /root/airflow/logs/scheduler/2017-05-16/setup.py.log
>> >>>>> [2017-05-16 20:26:48,727] {jobs.py:1444} INFO - Heartbeating
the
>> >>>> executor
>> >>>>> [2017-05-16 20:26:48,727] {jobs.py:1454} INFO - Heartbeating
the
>> >>>> scheduler
>> >>>>> Process DagFileProcessor19-Process:
>> >>>>> Traceback (most recent call last):
>> >>>>> File "/opt/conda/lib/python3.6/multiprocessing/process.py",
line
>> >>>> 249, in
>> >>>>> _bootstrap
>> >>>>>   self.run()
>> >>>>> File "/opt/conda/lib/python3.6/multiprocessing/process.py",
line
>> 93,
>> >>>> in
>> >>>>> run
>> >>>>>   self._target(*self._args, **self._kwargs)
>> >>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py",
line
>> >>>> 346,
>> >>>>> in helper
>> >>>>>   pickle_dags)
>> >>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/utils/db.py",
>> >>>> line
>> >>>>> 53, in wrapper
>> >>>>>   result = func(*args, **kwargs)
>> >>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py",
line
>> >>>> 1585,
>> >>>>> in process_file
>> >>>>>   self._process_dags(dagbag, dags, ti_keys_to_schedule)
>> >>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py",
line
>> >>>> 1174,
>> >>>>> in _process_dags
>> >>>>>   dag_run = self.create_dag_run(dag)
>> >>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/utils/db.py",
>> >>>> line
>> >>>>> 53, in wrapper
>> >>>>>   result = func(*args, **kwargs)
>> >>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py",
line
>> >>>> 807,
>> >>>>> in create_dag_run
>> >>>>>   else max(next_run_date, dag.start_date))
>> >>>>> TypeError: can't compare offset-naive and offset-aware datetimes
>> >>>>>
>> >>>>>
>> >>>>> Note that we can run/schedule the example DAGs from the CLI,
and our
>> >>>> DAGs
>> >>>>> are very closely derived from the examples, so we don't know
what to
>> >>>> do!
>> >>>>>
>> >>>>> For instance we can run:
>> >>>>>
>> >>>>> docker@airflow:/mnt/airflow$ airflow run agl_p2p_api_worker_dag
>> >>>>> api_worker_task 2017-01-01
>> >>>>> [2017-05-16 20:44:41,046] {__init__.py:57} INFO - Using executor
>> >>>>> SequentialExecutor
>> >>>>> Sending to executor.
>> >>>>> [2017-05-16 20:44:43,274] {__init__.py:57} INFO - Using executor
>> >>>>> SequentialExecutor
>> >>>>> Logging into:
>> >>>>> /root/airflow/logs/agl_p2p_api_worker_dag/api_worker_task/20
>> >>>> 17-01-01T00:00:00
>> >>>>>
>> >>>>>
>> >>>>> And there are no errors. We are sure the scheduler is running
and
>> the
>> >>>>> webserver is running. The airflow run/test commands work, but
we get
>> >>>> this
>> >>>>> same error above when we click the activate button on the web
app or
>> >>>>> unpause the DAG from the CLI. We have wondered if our start
date has
>> >>>> to be
>> >>>>> in the future maybe? We don't know.
>> >>>>>
>> >>>>> Any help would be appreciated. Thanks!
>> >>>>>
>> >>>>> Russell Jurney @rjurney <http://twitter.com/rjurney>
>> >>>>> russell.jurney@gmail.com LI <http://linkedin.com/in/russelljurney>
>> FB
>> >>>>> <http://facebook.com/jurney> datasyndrome.com
>> >>>>
>> >>>>
>> >>>
>> >>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message