Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8FC802009F5 for ; Sun, 8 May 2016 17:25:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8E685160A06; Sun, 8 May 2016 15:25:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D4CB11609B2 for ; Sun, 8 May 2016 17:25:17 +0200 (CEST) Received: (qmail 95740 invoked by uid 500); 8 May 2016 15:25:17 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 95731 invoked by uid 99); 8 May 2016 15:25:17 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 08 May 2016 15:25:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id A43AB180481 for ; Sun, 8 May 2016 15:25:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 1wS105AgacAu for ; Sun, 8 May 2016 15:25:14 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 57CFF5F23A for ; Sun, 8 May 2016 15:25:14 +0000 (UTC) Received: (qmail 95708 invoked by uid 99); 8 May 2016 15:25:13 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 08 May 2016 15:25:13 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 3F58D2C14F8 for ; Sun, 8 May 2016 15:25:13 +0000 (UTC) Date: Sun, 8 May 2016 15:25:13 +0000 (UTC) From: "Steven Yvinec-Kruyk (JIRA)" To: commits@airflow.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (AIRFLOW-74) SubdagOperators can consume all celeryd worker processes MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Sun, 08 May 2016 15:25:18 -0000 Steven Yvinec-Kruyk created AIRFLOW-74: ------------------------------------------ Summary: SubdagOperators can consume all celeryd worker processes Key: AIRFLOW-74 URL: https://issues.apache.org/jira/browse/AIRFLOW-74 Project: Apache Airflow Issue Type: Bug Components: celery Affects Versions: Airflow 1.7.1, Airflow 1.7.0, Airflow 1.6.2 Environment: Airflow 1.7.1rc3 with CeleryExecutor 1 webserver 1 scheduler 2 workers Reporter: Steven Yvinec-Kruyk If the amount of concurrent ```SubdagOperator``` running >= the no. of celery worker processes tasks are unable to work. All SDOs come to a complete halt. Futhermore performance of a DAG is drastically reduced even before full saturation of the workers as less workers are gradually available for actual tasks. A workaround for this is to specify ```SequentialExecutor``` be used by the ```SubdagOperator``` ``` from datetime import timedelta, datetime from airflow.models import DAG, Pool from airflow.operators import BashOperator, SubDagOperator, DummyOperator from airflow.executors import SequentialExecutor import airflow # -----------------------------------------------------------------\ # DEFINE THE POOLS # -----------------------------------------------------------------/ session = airflow.settings.Session() for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']: pool = ( session.query(Pool) .filter(Pool.pool == p) .first()) if not pool: session.add(Pool(pool=p, slots=8)) session.commit() # -----------------------------------------------------------------\ # DEFINE THE DAG # -----------------------------------------------------------------/ # Define the Dag Name. This must be unique. dag_name = 'hanging_subdags_n16_sqe' # Default args are passed to each task default_args = { 'owner': 'Airflow', 'depends_on_past': False, 'start_date': datetime(2016, 04, 10), 'retries': 0, 'retry_interval': timedelta(minutes=5), 'email': ['your@email.com'], 'email_on_failure': True, 'email_on_retry': True, 'wait_for_downstream': False, } # Create the dag object dag = DAG(dag_name, default_args=default_args, schedule_interval='0 0 * * *' ) # -----------------------------------------------------------------\ # DEFINE THE TASKS # -----------------------------------------------------------------/ def get_subdag(dag, sd_id, pool=None): subdag = DAG( dag_id='{parent_dag}.{sd_id}'.format( parent_dag=dag.dag_id, sd_id=sd_id), params=dag.params, default_args=dag.default_args, template_searchpath=dag.template_searchpath, user_defined_macros=dag.user_defined_macros, ) t1 = BashOperator( task_id='{sd_id}_step_1'.format( sd_id=sd_id ), bash_command='echo "hello" && sleep 60', dag=subdag, pool=pool, executor=SequentialExecutor ) t2 = BashOperator( task_id='{sd_id}_step_two'.format( sd_id=sd_id ), bash_command='echo "hello" && sleep 15', dag=subdag, pool=pool, executor=SequentialExecutor ) t2.set_upstream(t1) sdo = SubDagOperator( task_id=sd_id, subdag=subdag, retries=0, retry_delay=timedelta(seconds=5), dag=dag, depends_on_past=True, ) return sdo start_task = DummyOperator( task_id='start', dag=dag ) for n in range(1, 17): sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), pool='test_pool_1') sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), pool='test_pool_2') sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), pool='test_pool_3') sd_i.set_upstream(start_task) sd_ii.set_upstream(sd_i) sd_iii.set_upstream(sd_ii) ``` -- This message was sent by Atlassian JIRA (v6.3.4#6332)