airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sam Danbury (Jira)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-5559) Kubernetes Executor operator-level executor_config override applies to all operators in a DAG
Date Fri, 27 Sep 2019 10:26:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-5559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16939309#comment-16939309
] 

Sam Danbury commented on AIRFLOW-5559:
--------------------------------------

Turns out it was relevant. Yes, I can confirm, removing the policy fixes the issue. I will
close.

> Kubernetes Executor operator-level executor_config override applies to all operators
in a DAG
> ---------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-5559
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-5559
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: executor-kubernetes
>    Affects Versions: 1.10.4
>            Reporter: Sam Danbury
>            Assignee: Daniel Imberman
>            Priority: Minor
>
> When applying requests and limits to the worker pods spun up by the Kubernetes Executor
(using the executor_config argument), the last operator to set the the resources wins and
*all* worker pods in the DAG get the settings applied, not just the single operator.
>  
> The problematic DAG:
> {code:java}
> from datetime import datetime, timedelta
> from airflow.operators.bash_operator import BashOperator
>  from airflow.models import DAG
> default_args =
> {    'owner': 'airflow',    'depends_on_past': False,    'email_on_failure': False,
   'email_on_retry': False,    'retries': 2,    'retry_delay': timedelta(hours=1) }
> dag = DAG(
>     'two_tasked_dag',
>     start_date=datetime(2018, 8, 21),
>     default_args=default_args,
>     catchup=False
>  )
> a = BashOperator(
>     task_id="A",
>     bash_command="""
>  echo hello
>  """,
>     dag=dag,
>     executor_config={
>         "KubernetesExecutor":
> {            "request_memory": "700Mi",
>              "request_cpu": "300m",
>              "limit_memory": "900Mi",
>              "limit_cpu": "500m",        }
>    }
>  )
> b = BashOperator(
>     task_id="B",
>     bash_command="""
>  echo hello
>  """,
>     dag=dag,
>     executor_config={
>         "KubernetesExecutor":
> {            "request_memory": "800Mi",
>              "request_cpu": "200m",
>              "limit_memory": "1Gi",
>              "limit_cpu": "600m",        }
>    }
> ){code}
>  
> Pod descriptions showing the applied resource requests and limits:
> {code:java}
> $ kubectl get pods -n airflow
> NAME READY STATUS RESTARTS AGE
> airflow-postgresql-5bf5b6ddf5-f86sz 1/1 Running 0 22h
> airflow-scheduler-6496dfd5db-cbhkc 2/2 Running 2 52m
> airflow-web-7d76c9c4c-f7cpz 3/3 Running 3 52m
> twotaskeddaga-350ad7a98c4b4908a8bfb02478e346a6 1/1 Running 0 1m
> twotaskeddagb-faa8831f58b94b2fa771b62cb6bb952e 1/1 Running 0 1m
> $ kubectl describe pod -n airflow twotaskeddaga-350ad7a98c4b4908a8bfb02478e346a6
> ...
>     Limits:
>       cpu:     600m
>       memory:  1Gi
>     Requests:
>       cpu:     200m
>       memory:  800Mi
> ...
> $ kubectl describe pod -n airflow twotaskeddagb-faa8831f58b94b2fa771b62cb6bb952e
> ...
>     Limits:
>       cpu:     600m
>       memory:  1Gi
>     Requests:
>       cpu:     200m
>       memory:  800Mi
> ...
> {code}
>  
> The above shows that the last task in the DAG sets the resource limits for the first
task in the DAG, even though they have separate executor_config's. This also happens if task
A were to not have any executor_config set at all.
>  
> It is also worth mentioning (although maybe not relevant) that I am setting default pod
resources using an airflow_local_settings.py file:
>  
> {code:java}
> from airflow.utils.log.logging_mixin import LoggingMixin
>     from deepmerge import Merger
>     DEFAULT_EXECUTOR_CONFIG = {
>         "KubernetesExecutor": {
>             "request_memory": "250Mi",
>             "request_cpu": "250m",
>             "limit_memory": "500Mi",
>             "limit_cpu": "500m",
>         }
>     }
>     merger = Merger(
>         type_strategies=[(list, ["append"]), (dict, ["merge"])],
>         fallback_strategies=["override"],
>         type_conflict_strategies=["override"],
>     )
>     log = LoggingMixin().log
>     def policy(ti):
>         """Add the default executor config to all task instances."""
>         executor_config = getattr(ti, "executor_config", {})
>         if executor_config:
>             log.debug(f"Merging {ti.task_id} executor config with default executor config")
>             ti.executor_config = merger.merge(DEFAULT_EXECUTOR_CONFIG, executor_config)
>             log.debug(ti.executor_config)
>         else:
>             log.debug(f"Applying default executor config to {ti.task_id}")
>             ti.executor_config = DEFAULT_EXECUTOR_CONFIG
>             log.debug(ti.executor_config)
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message