airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1658] Kill Druid task on timeout
Date Mon, 02 Oct 2017 15:09:32 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-9-test d42d231ee -> cbf7add7a


[AIRFLOW-1658] Kill Druid task on timeout

If the total execution time of a Druid task
exceeds the max timeout
defined, the Airflow task fails, but the Druid
task may still keep
running. This can cause undesired behaviour if
Airflow retries the
task. This patch calls the shutdown endpoint on
the Druid task to
kill any still running Druid task.

This commit also adds tests to ensure that all
mocked requests in
the Druid hook are actually called.

Closes #2644 from
danielvdende/kill_druid_task_on_timeout_exceeded

(cherry picked from commit c61726288dcdb093c55a38faaf60aef020d0d3e0)
Signed-off-by: Bolke de Bruin <bolke@xs4all.nl>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cbf7add7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cbf7add7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cbf7add7

Branch: refs/heads/v1-9-test
Commit: cbf7add7aa2e61d1bfe511d6a8250b63485068bb
Parents: d42d231
Author: Daniel van der Ende <daniel.vanderende@gmail.com>
Authored: Mon Oct 2 17:09:07 2017 +0200
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Mon Oct 2 17:09:25 2017 +0200

----------------------------------------------------------------------
 airflow/hooks/druid_hook.py    |  2 ++
 tests/hooks/test_druid_hook.py | 33 +++++++++++++++++++++++++--------
 2 files changed, 27 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cbf7add7/airflow/hooks/druid_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
index 0b13670..655f666 100644
--- a/airflow/hooks/druid_hook.py
+++ b/airflow/hooks/druid_hook.py
@@ -73,6 +73,8 @@ class DruidHook(BaseHook):
             sec = sec + 1
 
             if sec > self.max_ingestion_time:
+                # ensure that the job gets killed if the max ingestion time is exceeded
+                requests.post("{0}/{1}/shutdown".format(url, druid_task_id))
                 raise AirflowException('Druid ingestion took more than %s seconds', self.max_ingestion_time)
 
             time.sleep(self.timeout)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cbf7add7/tests/hooks/test_druid_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_druid_hook.py b/tests/hooks/test_druid_hook.py
index c049cb2..ddab369 100644
--- a/tests/hooks/test_druid_hook.py
+++ b/tests/hooks/test_druid_hook.py
@@ -33,11 +33,11 @@ class TestDruidHook(unittest.TestCase):
     @requests_mock.mock()
     def test_submit_gone_wrong(self, m):
         hook = DruidHook()
-        m.post(
+        task_post = m.post(
             'http://druid-overlord:8081/druid/indexer/v1/task',
             text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
         )
-        m.get(
+        status_check = m.get(
             'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
             text='{"status":{"status": "FAILED"}}'
         )
@@ -46,14 +46,17 @@ class TestDruidHook(unittest.TestCase):
         with self.assertRaises(AirflowException):
             hook.submit_indexing_job('Long json file')
 
+        self.assertTrue(task_post.called_once)
+        self.assertTrue(status_check.called_once)
+
     @requests_mock.mock()
     def test_submit_ok(self, m):
         hook = DruidHook()
-        m.post(
+        task_post = m.post(
             'http://druid-overlord:8081/druid/indexer/v1/task',
             text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
         )
-        m.get(
+        status_check = m.get(
             'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
             text='{"status":{"status": "SUCCESS"}}'
         )
@@ -61,14 +64,17 @@ class TestDruidHook(unittest.TestCase):
         # Exists just as it should
         hook.submit_indexing_job('Long json file')
 
+        self.assertTrue(task_post.called_once)
+        self.assertTrue(status_check.called_once)
+
     @requests_mock.mock()
     def test_submit_unknown_response(self, m):
         hook = DruidHook()
-        m.post(
+        task_post = m.post(
             'http://druid-overlord:8081/druid/indexer/v1/task',
             text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
         )
-        m.get(
+        status_check = m.get(
             'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
             text='{"status":{"status": "UNKNOWN"}}'
         )
@@ -77,22 +83,33 @@ class TestDruidHook(unittest.TestCase):
         with self.assertRaises(AirflowException):
             hook.submit_indexing_job('Long json file')
 
+        self.assertTrue(task_post.called_once)
+        self.assertTrue(status_check.called_once)
+
     @requests_mock.mock()
     def test_submit_timeout(self, m):
         hook = DruidHook(timeout=0, max_ingestion_time=5)
-        m.post(
+        task_post = m.post(
             'http://druid-overlord:8081/druid/indexer/v1/task',
             text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
         )
-        m.get(
+        status_check = m.get(
             'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
             text='{"status":{"status": "RUNNING"}}'
         )
+        shutdown_post = m.post(
+            'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown',
+            text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
+        )
 
         # Because the jobs keeps running
         with self.assertRaises(AirflowException):
             hook.submit_indexing_job('Long json file')
 
+        self.assertTrue(task_post.called_once)
+        self.assertTrue(status_check.called)
+        self.assertTrue(shutdown_post.called_once)
+
 
 
 


Mime
View raw message