airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Russell Jurney <russell.jur...@gmail.com>
Subject Simple Airflow BashOperators run but can't be scheduled or un-paused
Date Tue, 16 May 2017 20:46:34 GMT
We have tasks that run, but we can't get them to run as scheduled or to
un-pause.

The code for the task looks like this:

# Run the API worker every 5 minutes
api_worker_dag = DAG(
    'agl_p2p_api_worker_dag',
    default_args=default_args,
    schedule_interval=timedelta(minutes=5)
)

# Run the API worker[
api_worker_task = BashOperator(
    task_id="api_worker_task",
    bash_command="""python {{ params.base_path
}}/agl-p2p-api-worker/site/worker.py {{ ds }}""",
    params={
        "base_path": project_home
    },
    dag=api_worker_dag
)

We run this command: airflow unpause agl_p2p_api_worker_dag

And we see this error:

[2017-05-16 20:26:48,722] {jobs.py:1408} INFO - Heartbeating the process
manager
[2017-05-16 20:26:48,723] {dag_processing.py:559} INFO - Processor for
/root/airflow/dags/setup.py finished
[2017-05-16 20:26:48,723] {dag_processing.py:578} WARNING - Processor for
/root/airflow/dags/setup.py exited with return code 1. See
/root/airflow/logs/scheduler/2017-05-16/setup.py.log for details.
[2017-05-16 20:26:48,726] {dag_processing.py:627} INFO - Started a process
(PID: 110) to generate tasks for /root/airflow/dags/setup.py - logging into
/root/airflow/logs/scheduler/2017-05-16/setup.py.log
[2017-05-16 20:26:48,727] {jobs.py:1444} INFO - Heartbeating the executor
[2017-05-16 20:26:48,727] {jobs.py:1454} INFO - Heartbeating the scheduler
Process DagFileProcessor19-Process:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/multiprocessing/process.py", line 249, in
_bootstrap
    self.run()
  File "/opt/conda/lib/python3.6/multiprocessing/process.py", line 93, in
run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py", line 346,
in helper
    pickle_dags)
  File "/opt/conda/lib/python3.6/site-packages/airflow/utils/db.py", line
53, in wrapper
    result = func(*args, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py", line 1585,
in process_file
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py", line 1174,
in _process_dags
    dag_run = self.create_dag_run(dag)
  File "/opt/conda/lib/python3.6/site-packages/airflow/utils/db.py", line
53, in wrapper
    result = func(*args, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py", line 807,
in create_dag_run
    else max(next_run_date, dag.start_date))
TypeError: can't compare offset-naive and offset-aware datetimes


Note that we can run/schedule the example DAGs from the CLI, and our DAGs
are very closely derived from the examples, so we don't know what to do!

For instance we can run:

docker@airflow:/mnt/airflow$ airflow run agl_p2p_api_worker_dag
api_worker_task 2017-01-01
[2017-05-16 20:44:41,046] {__init__.py:57} INFO - Using executor
SequentialExecutor
Sending to executor.
[2017-05-16 20:44:43,274] {__init__.py:57} INFO - Using executor
SequentialExecutor
Logging into:
/root/airflow/logs/agl_p2p_api_worker_dag/api_worker_task/2017-01-01T00:00:00


And there are no errors. We are sure the scheduler is running and the
webserver is running. The airflow run/test commands work, but we get this
same error above when we click the activate button on the web app or
unpause the DAG from the CLI. We have wondered if our start date has to be
in the future maybe? We don't know.

Any help would be appreciated. Thanks!

Russell Jurney @rjurney <http://twitter.com/rjurney>
russell.jurney@gmail.com LI <http://linkedin.com/in/russelljurney> FB
<http://facebook.com/jurney> datasyndrome.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message