airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Russell Jurney <russell.jur...@gmail.com>
Subject Re: Simple Airflow BashOperators run but can't be scheduled or un-paused
Date Wed, 17 May 2017 17:34:19 GMT
I am running UTC, synchronized to Amazon time via NTP. At some point in the
3.x line, Python's datetime started returning a timezone set. This is
breaking things.

Bolke: Can you please direct me to that patch? I tried doing this myself,
but it is complicated and time consuming to get it to work.

Russell Jurney @rjurney <http://twitter.com/rjurney>
russell.jurney@gmail.com LI <http://linkedin.com/in/russelljurney> FB
<http://facebook.com/jurney> datasyndrome.com

On Wed, May 17, 2017 at 12:01 AM, Bolke de Bruin <bdbruin@gmail.com> wrote:

> We probably need to use .utcnow() at most places. There was a patch for
> that, but we/I hold it off due to operational implications that it might
> bring and it is kind of hard to test.
>
> B.
>
> > On 17 May 2017, at 02:20, Russell Jurney <russell.jurney@gmail.com>
> wrote:
> >
> > It seems that this entire file needs to be patched so that the
> > datetime.now() calls use tz=pytz.utc. At some point Python started
> > including timezones in datetime.now() and so this is broken.
> >
> > I think I can patch it. But one problem I am having is how do I see the
> log
> > messages of the scheduler when  I use for example:
> > self.logger.error(job.latest_heartbeat)
> > ?
> >
> > Russell Jurney @rjurney <http://twitter.com/rjurney>
> > russell.jurney@gmail.com LI <http://linkedin.com/in/russelljurney> FB
> > <http://facebook.com/jurney> datasyndrome.com
> >
> > On Tue, May 16, 2017 at 3:40 PM, Russell Jurney <
> russell.jurney@gmail.com>
> > wrote:
> >
> >> I setup conda to run python 3.4 and I still get this when a job is
> >> scheduled and runs:
> >>
> >> [2017-05-16 15:37:54,339] {jobs.py:1171} DagFileProcessor1 INFO -
> >> Processing agl_p2p_api_worker_dag
> >> [2017-05-16 15:37:55,601] {jobs.py:354} DagFileProcessor1 ERROR - Got an
> >> exception! Propagating...
> >> Traceback (most recent call last):
> >>  File "/Users/rjurney/Software/incubator-airflow/airflow/jobs.py", line
> >> 346, in helper
> >>    pickle_dags)
> >>  File "/Users/rjurney/Software/incubator-airflow/airflow/utils/db.py",
> >> line 48, in wrapper
> >>    result = func(*args, **kwargs)
> >>  File "/Users/rjurney/Software/incubator-airflow/airflow/jobs.py", line
> >> 1584, in process_file
> >>    self._process_dags(dagbag, dags, ti_keys_to_schedule)
> >>  File "/Users/rjurney/Software/incubator-airflow/airflow/jobs.py", line
> >> 1173, in _process_dags
> >>    dag_run = self.create_dag_run(dag)
> >>  File "/Users/rjurney/Software/incubator-airflow/airflow/utils/db.py",
> >> line 48, in wrapper
> >>    result = func(*args, **kwargs)
> >>  File "/Users/rjurney/Software/incubator-airflow/airflow/jobs.py", line
> >> 815, in create_dag_run
> >>    if next_run_date > datetime.now():
> >> TypeError: can't compare offset-naive and offset-aware datetimes
> >>
> >>
> >> What do I do?
> >>
> >> Russell Jurney @rjurney <http://twitter.com/rjurney>
> >> russell.jurney@gmail.com LI <http://linkedin.com/in/russelljurney> FB
> >> <http://facebook.com/jurney> datasyndrome.com
> >>
> >> On Tue, May 16, 2017 at 2:18 PM, Russell Jurney <
> russell.jurney@gmail.com>
> >> wrote:
> >>
> >>> Thanks, we're trying that now!
> >>>
> >>> Russell Jurney @rjurney <http://twitter.com/rjurney>
> >>> russell.jurney@gmail.com LI <http://linkedin.com/in/russelljurney>
FB
> >>> <http://facebook.com/jurney> datasyndrome.com
> >>>
> >>> On Tue, May 16, 2017 at 2:02 PM, Bolke de Bruin <bdbruin@gmail.com>
> >>> wrote:
> >>>
> >>>> Did you try to run this on Py 2.7 / 3.4 as well? I notice you are
> >>>> running on 3.6, which we are not testing against at the moment.
> >>>>
> >>>> Bolke.
> >>>>
> >>>>> On 16 May 2017, at 22:46, Russell Jurney <russell.jurney@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>> We have tasks that run, but we can't get them to run as scheduled
or
> to
> >>>>> un-pause.
> >>>>>
> >>>>> The code for the task looks like this:
> >>>>>
> >>>>> # Run the API worker every 5 minutes
> >>>>> api_worker_dag = DAG(
> >>>>>   'agl_p2p_api_worker_dag',
> >>>>>   default_args=default_args,
> >>>>>   schedule_interval=timedelta(minutes=5)
> >>>>> )
> >>>>>
> >>>>> # Run the API worker[
> >>>>> api_worker_task = BashOperator(
> >>>>>   task_id="api_worker_task",
> >>>>>   bash_command="""python {{ params.base_path
> >>>>> }}/agl-p2p-api-worker/site/worker.py {{ ds }}""",
> >>>>>   params={
> >>>>>       "base_path": project_home
> >>>>>   },
> >>>>>   dag=api_worker_dag
> >>>>> )
> >>>>>
> >>>>> We run this command: airflow unpause agl_p2p_api_worker_dag
> >>>>>
> >>>>> And we see this error:
> >>>>>
> >>>>> [2017-05-16 20:26:48,722] {jobs.py:1408} INFO - Heartbeating the
> >>>> process
> >>>>> manager
> >>>>> [2017-05-16 20:26:48,723] {dag_processing.py:559} INFO - Processor
> for
> >>>>> /root/airflow/dags/setup.py finished
> >>>>> [2017-05-16 20:26:48,723] {dag_processing.py:578} WARNING - Processor
> >>>> for
> >>>>> /root/airflow/dags/setup.py exited with return code 1. See
> >>>>> /root/airflow/logs/scheduler/2017-05-16/setup.py.log for details.
> >>>>> [2017-05-16 20:26:48,726] {dag_processing.py:627} INFO - Started
a
> >>>> process
> >>>>> (PID: 110) to generate tasks for /root/airflow/dags/setup.py -
> logging
> >>>> into
> >>>>> /root/airflow/logs/scheduler/2017-05-16/setup.py.log
> >>>>> [2017-05-16 20:26:48,727] {jobs.py:1444} INFO - Heartbeating the
> >>>> executor
> >>>>> [2017-05-16 20:26:48,727] {jobs.py:1454} INFO - Heartbeating the
> >>>> scheduler
> >>>>> Process DagFileProcessor19-Process:
> >>>>> Traceback (most recent call last):
> >>>>> File "/opt/conda/lib/python3.6/multiprocessing/process.py", line
> >>>> 249, in
> >>>>> _bootstrap
> >>>>>   self.run()
> >>>>> File "/opt/conda/lib/python3.6/multiprocessing/process.py", line
93,
> >>>> in
> >>>>> run
> >>>>>   self._target(*self._args, **self._kwargs)
> >>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py", line
> >>>> 346,
> >>>>> in helper
> >>>>>   pickle_dags)
> >>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/utils/db.py",
> >>>> line
> >>>>> 53, in wrapper
> >>>>>   result = func(*args, **kwargs)
> >>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py", line
> >>>> 1585,
> >>>>> in process_file
> >>>>>   self._process_dags(dagbag, dags, ti_keys_to_schedule)
> >>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py", line
> >>>> 1174,
> >>>>> in _process_dags
> >>>>>   dag_run = self.create_dag_run(dag)
> >>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/utils/db.py",
> >>>> line
> >>>>> 53, in wrapper
> >>>>>   result = func(*args, **kwargs)
> >>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py", line
> >>>> 807,
> >>>>> in create_dag_run
> >>>>>   else max(next_run_date, dag.start_date))
> >>>>> TypeError: can't compare offset-naive and offset-aware datetimes
> >>>>>
> >>>>>
> >>>>> Note that we can run/schedule the example DAGs from the CLI, and
our
> >>>> DAGs
> >>>>> are very closely derived from the examples, so we don't know what
to
> >>>> do!
> >>>>>
> >>>>> For instance we can run:
> >>>>>
> >>>>> docker@airflow:/mnt/airflow$ airflow run agl_p2p_api_worker_dag
> >>>>> api_worker_task 2017-01-01
> >>>>> [2017-05-16 20:44:41,046] {__init__.py:57} INFO - Using executor
> >>>>> SequentialExecutor
> >>>>> Sending to executor.
> >>>>> [2017-05-16 20:44:43,274] {__init__.py:57} INFO - Using executor
> >>>>> SequentialExecutor
> >>>>> Logging into:
> >>>>> /root/airflow/logs/agl_p2p_api_worker_dag/api_worker_task/20
> >>>> 17-01-01T00:00:00
> >>>>>
> >>>>>
> >>>>> And there are no errors. We are sure the scheduler is running and
the
> >>>>> webserver is running. The airflow run/test commands work, but we
get
> >>>> this
> >>>>> same error above when we click the activate button on the web app
or
> >>>>> unpause the DAG from the CLI. We have wondered if our start date
has
> >>>> to be
> >>>>> in the future maybe? We don't know.
> >>>>>
> >>>>> Any help would be appreciated. Thanks!
> >>>>>
> >>>>> Russell Jurney @rjurney <http://twitter.com/rjurney>
> >>>>> russell.jurney@gmail.com LI <http://linkedin.com/in/russelljurney>
> FB
> >>>>> <http://facebook.com/jurney> datasyndrome.com
> >>>>
> >>>>
> >>>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message