airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF subversion and git services (Jira)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-6452) scheduler_job.py - remove excess sleep/log/duration calls
Date Thu, 09 Jan 2020 12:32:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-6452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011767#comment-17011767
] 

ASF subversion and git services commented on AIRFLOW-6452:
----------------------------------------------------------

Commit 70af35827545adbf4f9d5cde1fd58027f18561fd in airflow's branch refs/heads/master from
tooptoop4
[ https://gitbox.apache.org/repos/asf?p=airflow.git;h=70af358 ]

[AIRFLOW-6452] scheduler_job.py - remove excess sleep/log/duration calls (#7089)

no need for 2 sleep, _processor_poll_interval will already sleep

> scheduler_job.py - remove excess sleep/log/duration calls
> ---------------------------------------------------------
>
>                 Key: AIRFLOW-6452
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6452
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: scheduler
>    Affects Versions: 1.10.7
>            Reporter: t oo
>            Assignee: t oo
>            Priority: Minor
>             Fix For: 1.10.8
>
>
> remove a lot of these debug calls, wrap some in boolean of loglevel, remove the 2nd sleep
and stuff about getting duration/start/end.etc:
> self.log.debug("Starting Loop...")
>             loop_start_time = time.time()
>             if self.using_sqlite:
>                 self.processor_agent.heartbeat()
>                 # For the sqlite case w/ 1 thread, wait until the processor
>                 # is finished to avoid concurrent access to the DB.
>                 self.log.debug(
>                     "Waiting for processors to finish since we're using sqlite")
>                 self.processor_agent.wait_until_finished()
>             self.log.debug("Harvesting DAG parsing results")
>             simple_dags = self._get_simple_dags()
>             self.log.debug("Harvested {} SimpleDAGs".format(len(simple_dags)))
>             # Send tasks for execution if available
>             simple_dag_bag = SimpleDagBag(simple_dags)
>             if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
>                 continue
>             # Heartbeat the scheduler periodically
>             time_since_last_heartbeat = (timezone.utcnow() -
>                                          last_self_heartbeat_time).total_seconds()
>             if time_since_last_heartbeat > self.heartrate:
>                 self.log.debug("Heartbeating the scheduler")
>                 self.heartbeat()
>                 last_self_heartbeat_time = timezone.utcnow()
>             loop_end_time = time.time()
>             loop_duration = loop_end_time - loop_start_time
>             self.log.debug(
>                 "Ran scheduling loop in %.2f seconds",
>                 loop_duration)
>             if not is_unit_test:
>                 self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
>                 time.sleep(self._processor_poll_interval)
>             if self.processor_agent.done:
>                 self.log.info("Exiting scheduler loop as all files"
>                               " have been processed {} times".format(self.num_runs))
>                 break
>             if loop_duration < 1 and not is_unit_test:
>                 sleep_length = 1 - loop_duration
>                 self.log.debug(
>                     "Sleeping for {0:.2f} seconds to prevent excessive logging"
>                     .format(sleep_length))
>                 sleep(sleep_length)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message