airflow-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Peter Lane <Peter.L...@ama-assn.org>
Subject Task fails with no indication of why
Date Tue, 09 Feb 2021 04:40:14 GMT
I am running Airflow 2.0.1 with the KubernetesExcecutor on OpenShift. Last night I finally
got a simple PythonOperator test DAG to run, but now the DAG tasks mysteriously fail with
no indication why. Here is what happens when I run the task with DEBUG level logging:

----------
$ airflow tasks run test_python do-it 2021-02-09T04:14:25.310295+00:00 --local --pool default_pool
--subdir /home/airflow/repo/hs-datalabs/Source/Python/datalabs/airflow/dag/test_python.py
[2021-02-09 04:28:12,070] {settings.py:210} DEBUG - Setting up DB connection pool (PID 17)
[2021-02-09 04:28:12,071] {settings.py:281} DEBUG - settings.prepare_engine_args(): Using
pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=17
[2021-02-09 04:28:12,334] {cli_action_loggers.py:40} DEBUG - Adding <function default_action_log
at 0x7f053b740c20> to pre execution callback
[2021-02-09 04:28:15,235] {cli_action_loggers.py:66} DEBUG - Calling callbacks: [<function
default_action_log at 0x7f053b740c20>]
[2021-02-09 04:28:15,321] {settings.py:210} DEBUG - Setting up DB connection pool (PID 17)
[2021-02-09 04:28:15,322] {settings.py:243} DEBUG - settings.prepare_engine_args(): Using
NullPool
[2021-02-09 04:28:15,323] {dagbag.py:448} INFO - Filling up the DagBag from /home/airflow/repo/hs-datalabs/Source/Python/datalabs/airflow/dag/test_python.py
[2021-02-09 04:28:15,426] {dagbag.py:287} DEBUG - Importing /home/airflow/repo/hs-datalabs/Source/Python/datalabs/airflow/dag/test_python.py
[2021-02-09 04:28:15,466] {dagbag.py:413} DEBUG - Loaded DAG <DAG: test_python>
[2021-02-09 04:28:15,467] {dagbag.py:287} DEBUG - Importing /usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py
[2021-02-09 04:28:15,472] {dagbag.py:413} DEBUG - Loaded DAG <DAG: example_bash_operator>
[2021-02-09 04:28:15,472] {dagbag.py:287} DEBUG - Importing /usr/local/lib/python3.7/site-packages/airflow/example_dags/example_branch_operator.py
[2021-02-09 04:28:15,477] {dagbag.py:413} DEBUG - Loaded DAG <DAG: example_branch_operator>
. . .
[2021-02-09 04:28:15,764] {dagbag.py:287} DEBUG - Importing /usr/local/lib/python3.7/site-packages/airflow/example_dags/tutorial_taskflow_api_etl.py
[2021-02-09 04:28:15,766] {dagbag.py:413} DEBUG - Loaded DAG <DAG: tutorial_taskflow_api_etl>
[2021-02-09 04:28:15,766] {dagbag.py:287} DEBUG - Importing /usr/local/lib/python3.7/site-packages/airflow/example_dags/subdags/subdag.py
[2021-02-09 04:28:15,801] {plugins_manager.py:270} DEBUG - Loading plugins
[2021-02-09 04:28:15,801] {plugins_manager.py:207} DEBUG - Loading plugins from directory:
/home/airflow/airflow/plugins
[2021-02-09 04:28:15,801] {plugins_manager.py:184} DEBUG - Loading plugins from entrypoints
[2021-02-09 04:28:16,128] {plugins_manager.py:414} DEBUG - Integrate DAG plugins
Running <TaskInstance: test_python.do-it 2021-02-09T04:14:25.310295+00:00 [failed]>
on host testpythondoit.3c93d77d6b8141348718e2c6467e55a9-debug
[2021-02-09 04:28:16,442] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []
[2021-02-09 04:28:16,442] {settings.py:292} DEBUG - Disposing DB connection pool (PID 17)
----------

Here is my DAG. Note that neither of the callbacks are running either:

----------
from pprint import pprint

from airflow import DAG
from kubernetes.client import models as k8s
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago


def print_that(datestamp, **kwargs):
    pprint(kwargs)
    print(datestamp)

    return "Foobiddy Doobiddy"


def on_failure_callback(context):
    dag_run = context.get('dag_run')
    task_instances = dag_run.get_task_instances()
    print(f'Failure: {task_instances}')


def on_success_callback(context):
    dag_run = context.get('dag_run')
    task_instances = dag_run.get_task_instances()
    print(f'Success: {task_instances}')


with DAG(
    dag_id='test_python',
    default_args={'owner': 'airflow'},
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['testing'],
    on_failure_callback=on_failure_callback,
    on_success_callback=on_success_callback,
) as dag:
    do_it = PythonOperator(
        task_id="do-it",
       python_callable=print_that,
        executor_config={
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            image="docker-registry.default.svc:5000/hsg-data-labs-dev/airflow-worker:1.0.1"
                        )
                    ]
                )
            ),
        },
    )


    do_it_again = PythonOperator(
        task_id="do-it-again",
        python_callable=print_that,
        executor_config={
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            image="docker-registry.default.svc:5000/hsg-data-labs-dev/airflow-worker:1.0.1"
                        )
                    ]
                )
            ),
        },
    )


do_it >> do_it_again
----------

I would really appreciate some ideas on how I would debug or what might be wrong.
Thanks,
Peter
Mime
View raw message