airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Erik Cederstrand (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
Date Wed, 15 Feb 2017 18:30:41 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15868309#comment-15868309
] 

Erik Cederstrand commented on AIRFLOW-342:
------------------------------------------

I'm also running into this issue, but in 1.8.0-rc3, installed from GitHub. I have installed
Airflow with Python3, RabbitMQ and Celery in a Docker setup, and start with one very simple
DAG:

{code}
from datetime import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    "start_date": datetime(2017, 2, 12),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

def test_callable(msg, *args, **kwargs):
    print(msg)

dag = DAG('test_dag', default_args=default_args, schedule_interval='@daily')
test_task = PythonOperator(
    task_id='test_task', 
    python_callable=test_callable,
    op_kwargs={
        'msg': 'Hello Airflow',
    },
    provide_context=True,
    dag=dag, 
    retries=0
)
{code}

When I start up the scheduler with {airflow scheduler}, it crashes with this output:
{code}
$ airflow scheduler
[2017-02-15 17:56:12,397] {__init__.py:57} INFO - Using executor CeleryExecutor
[2017-02-15 17:56:12,465] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.4/lib2to3/Grammar.txt
[2017-02-15 17:56:12,482] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.4/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
 
[2017-02-15 17:56:12,708] {jobs.py:1262} INFO - Starting the scheduler
[2017-02-15 17:56:12,708] {jobs.py:1278} INFO - Processing files using up to 2 processes at
a time 
[2017-02-15 17:56:12,708] {jobs.py:1280} INFO - Running execute loop for -1 seconds
[2017-02-15 17:56:12,708] {jobs.py:1282} INFO - Processing each file at most -1 times
[2017-02-15 17:56:12,708] {jobs.py:1284} INFO - Process each file at most once every 0 seconds
[2017-02-15 17:56:12,708] {jobs.py:1286} INFO - Checking for new files in /opt/bbd/src/workflows/dags
every 300 seconds
[2017-02-15 17:56:12,708] {jobs.py:1289} INFO - Searching for files in /opt/bbd/src/workflows/dags
[2017-02-15 17:56:12,709] {jobs.py:1292} INFO - There are 1 files in /opt/bbd/src/workflows/dags
[2017-02-15 17:56:12,709] {jobs.py:1354} INFO - Resetting state for orphaned tasks
[2017-02-15 17:56:12,712] {jobs.py:1363} INFO - Resetting test_dag 2017-02-13 00:00:00
[2017-02-15 17:56:12,718] {jobs.py:1404} INFO - Heartbeating the process manager
[2017-02-15 17:56:12,731] {dag_processing.py:627} INFO - Started a process (PID: 35) to generate
tasks for /opt/bbd/src/workflows/dags/test_dag.py - logging into /opt/bbd/src/workflows/logs/scheduler/2017-02-15/test_dag.py.log
[2017-02-15 17:56:12,731] {jobs.py:1440} INFO - Heartbeating the executor
[2017-02-15 17:56:12,732] {jobs.py:1237} INFO - 
================================================================================
DAG File Processing Stats

File Path                                  PID  Runtime    Last Runtime    Last Run
---------------------------------------  -----  ---------  --------------  ----------
/opt/bbd/src/workflows/dags/test_dag.py     35  0.00s
================================================================================
[2017-02-15 17:56:13,733] {jobs.py:1404} INFO - Heartbeating the process manager
[2017-02-15 17:56:13,734] {dag_processing.py:559} INFO - Processor for /opt/bbd/src/workflows/dags/test_dag.py
finished
[2017-02-15 17:56:13,738] {dag_processing.py:627} INFO - Started a process (PID: 37) to generate
tasks for /opt/bbd/src/workflows/dags/test_dag.py - logging into /opt/bbd/src/workflows/logs/scheduler/2017-02-15/test_dag.py.log
[2017-02-15 17:56:13,774] {jobs.py:985} INFO - Tasks up for execution:
	<TaskInstance: test_dag.test_task 2017-02-14 00:00:00 [scheduled]>
[2017-02-15 17:56:13,779] {jobs.py:1008} INFO - Figuring out tasks to run in Pool(name=None)
with 128 open slots and 1 task instances in queue
[2017-02-15 17:56:13,787] {jobs.py:1056} INFO - DAG test_dag has 0/2 running tasks
[2017-02-15 17:56:13,788] {jobs.py:1083} INFO - Sending to executor ('test_dag', 'test_task',
datetime.datetime(2017, 2, 14, 0, 0)) with priority 1 and queue default
[2017-02-15 17:56:13,791] {jobs.py:1094} INFO - Setting state of ('test_dag', 'test_task',
datetime.datetime(2017, 2, 14, 0, 0)) to queued
[2017-02-15 17:56:13,806] {base_executor.py:50} INFO - Adding to queue: airflow run test_dag
test_task 2017-02-14T00:00:00 --local -sd /opt/bbd/src/workflows/dags/test_dag.py
[2017-02-15 17:56:13,807] {jobs.py:1440} INFO - Heartbeating the executor
[2017-02-15 17:56:13,812] {celery_executor.py:78} INFO - [celery] queuing ('test_dag', 'test_task',
datetime.datetime(2017, 2, 14, 0, 0)) through celery, queue=default
[2017-02-15 17:56:14,891] {jobs.py:1404} INFO - Heartbeating the process manager
[2017-02-15 17:56:14,891] {dag_processing.py:559} INFO - Processor for /opt/bbd/src/workflows/dags/test_dag.py
finished
[2017-02-15 17:56:14,893] {dag_processing.py:627} INFO - Started a process (PID: 39) to generate
tasks for /opt/bbd/src/workflows/dags/test_dag.py - logging into /opt/bbd/src/workflows/logs/scheduler/2017-02-15/test_dag.py.log
[2017-02-15 17:56:14,913] {jobs.py:980} INFO - No tasks to send to the executor
[2017-02-15 17:56:14,913] {jobs.py:1440} INFO - Heartbeating the executor
[2017-02-15 17:56:14,914] {jobs.py:1311} INFO - Exited execute loop
[2017-02-15 17:56:14,914] {jobs.py:1325} INFO - Terminating child PID: 39
[2017-02-15 17:56:14,915] {jobs.py:1329} INFO - Waiting up to 5s for processes to exit...
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/local/lib/python3.4/dist-packages/airflow/bin/cli.py", line 844, in scheduler
    job.run()
  File "/usr/local/lib/python3.4/dist-packages/airflow/jobs.py", line 200, in run
    self._execute()
  File "/usr/local/lib/python3.4/dist-packages/airflow/jobs.py", line 1309, in _execute
    self._execute_helper(processor_manager)
  File "/usr/local/lib/python3.4/dist-packages/airflow/jobs.py", line 1441, in _execute_helper
    self.executor.heartbeat()
  File "/usr/local/lib/python3.4/dist-packages/airflow/executors/base_executor.py", line 132,
in heartbeat
    self.sync()
  File "/usr/local/lib/python3.4/dist-packages/airflow/executors/celery_executor.py", line
88, in sync
    state = async.state
  File "/usr/local/lib/python3.4/dist-packages/celery/result.py", line 431, in state
    return self._get_task_meta()['status']
  File "/usr/local/lib/python3.4/dist-packages/celery/result.py", line 370, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "/usr/local/lib/python3.4/dist-packages/celery/backends/amqp.py", line 156, in get_task_meta
    binding.declare()
  File "/usr/local/lib/python3.4/dist-packages/kombu/entity.py", line 604, in declare
    self._create_exchange(nowait=nowait, channel=channel)
  File "/usr/local/lib/python3.4/dist-packages/kombu/entity.py", line 611, in _create_exchange
    self.exchange.declare(nowait=nowait, channel=channel)
  File "/usr/local/lib/python3.4/dist-packages/kombu/entity.py", line 185, in declare
    nowait=nowait, passive=passive,
  File "/usr/local/lib/python3.4/dist-packages/amqp/channel.py", line 630, in exchange_declare
    wait=None if nowait else spec.Exchange.DeclareOk,
  File "/usr/local/lib/python3.4/dist-packages/amqp/abstract_channel.py", line 64, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/usr/local/lib/python3.4/dist-packages/amqp/method_framing.py", line 174, in write_frame
    write(view[:offset])
  File "/usr/local/lib/python3.4/dist-packages/amqp/transport.py", line 269, in write
    self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
{code}

If I comment out {self.executor.heartbeat()} in {jobs.py:1441}, the scheduler completes, but
it seems there are issues with scheduling the job properly.

The setup is very simple, with one docker for each instance (airflow webserver, airflow scheduler,
airflow worker, flower, postgres, rabbitmq). I can provide additional context if needed.

>  exception in 'airflow scheduler' : Connection reset by peer
> ------------------------------------------------------------
>
>                 Key: AIRFLOW-342
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-342
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery, scheduler
>    Affects Versions: Airflow 1.7.1.3
>         Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>            Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in _execute
>     executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", line 107,
in heartbeat
>     self.sync()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line
74, in sync
>     state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
>     return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in _get_task_meta
>     return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, in get_task_meta
>     binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in declare
>    self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in declare
>     nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in exchange_declare
>     self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, in _send_method
>     self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, in write_method
>     write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in write_frame
>     frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
>     return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message