airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Erik Cederstrand (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
Date Thu, 16 Feb 2017 13:29:41 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869870#comment-15869870
] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 1:29 PM:
-------------------------------------------------------------------

Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less frequently. Preciously,
the scheduler would die every ~10 seconds, now it can live for some minutes. Here's a modified
patch to simply ignore {{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig	2017-02-16 11:58:55.057991344
+0000
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py	2017-02-16 11:57:07.060060262 +0000
@@ -1371,6 +1371,8 @@
         last_stat_print_time = datetime(2000, 1, 1)
         # Last time that self.heartbeat() was called.
         last_self_heartbeat_time = datetime.now()
+        # Last time that self.executor.heartbeat() was called.
+        last_executor_heartbeat_time = datetime.now()
         # Last time that the DAG dir was traversed to look for files
         last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
                 self._execute_task_instances(simple_dag_bag,
                                              (State.SCHEDULED,))

-            # Call hearbeats
-            self.logger.info("Heartbeating the executor")
-            self.executor.heartbeat()
+            # Heartbeat the executor periodically
+            time_since_last_heartbeat = (datetime.now() -
+                                         last_executor_heartbeat_time).total_seconds()
+            if time_since_last_heartbeat > self.heartrate:
+                self.logger.info("Heartbeating the executor")
+                try: self.executor.heartbeat()
+                except ConnectionResetError: pass  # RabbitMQ sometimes resets the socket
connection
+                last_executor_heartbeat_time = datetime.now()

             # Process events from the executor
             self._process_executor_events()
{code}

As a comment to the justifiability of this patch, our scheduler in production often dies so
early in the scheduling process that jobs are never progressed, leaving jobs in the celery
queue indefinitely and celery workers idling. Thus, wrapping the scheduler in a {{while True}}
loop as suggested elsewhere does nothing for us.


was (Author: erikcederstrand):
Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less frequently. Preciously,
the scheduler would die every ~10 seconds, now it can live for some minutes. Here's a modified
patch to simply ignore {{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig	2017-02-16 11:58:55.057991344
+0000
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py	2017-02-16 11:57:07.060060262 +0000
@@ -1371,6 +1371,8 @@
         last_stat_print_time = datetime(2000, 1, 1)
         # Last time that self.heartbeat() was called.
         last_self_heartbeat_time = datetime.now()
+        # Last time that self.executor.heartbeat() was called.
+        last_executor_heartbeat_time = datetime.now()
         # Last time that the DAG dir was traversed to look for files
         last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
                 self._execute_task_instances(simple_dag_bag,
                                              (State.SCHEDULED,))

-            # Call hearbeats
-            self.logger.info("Heartbeating the executor")
-            self.executor.heartbeat()
+            # Heartbeat the executor periodically
+            time_since_last_heartbeat = (datetime.now() -
+                                         last_executor_heartbeat_time).total_seconds()
+            if time_since_last_heartbeat > self.heartrate:
+                self.logger.info("Heartbeating the executor")
+                try: self.executor.heartbeat()
+                except ConnectionResetError: pass  # RabbitMQ sometimes resets the socket
connection
+                last_executor_heartbeat_time = datetime.now()

             # Process events from the executor
             self._process_executor_events()
{code}

As acomment to the justifiability of this patch, our scheduler in production often dies so
early in the scheduling process that jobs are never progressed. Thus, wrapping the scheduler
in a {{while True}} loop as suggested elsewhere does nothing for us.

>  exception in 'airflow scheduler' : Connection reset by peer
> ------------------------------------------------------------
>
>                 Key: AIRFLOW-342
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-342
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery, scheduler
>    Affects Versions: Airflow 1.7.1.3
>         Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>            Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in _execute
>     executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", line 107,
in heartbeat
>     self.sync()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line
74, in sync
>     state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
>     return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in _get_task_meta
>     return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, in get_task_meta
>     binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in declare
>    self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in declare
>     nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in exchange_declare
>     self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, in _send_method
>     self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, in write_method
>     write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in write_frame
>     frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
>     return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message