airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-3378) KubernetesPodOperator does not delete on timeout failure
Date Mon, 26 Nov 2018 09:03:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-3378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698638#comment-16698638
] 

ASF GitHub Bot commented on AIRFLOW-3378:
-----------------------------------------

Fokko closed pull request #4218: [AIRFLOW-3378] KubernetesPodOperator does not delete on timeout
failure
URL: https://github.com/apache/incubator-airflow/pull/4218
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 99c6da11b3..f6c1f9d45e 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -124,13 +124,14 @@ def execute(self, context):
 
             launcher = pod_launcher.PodLauncher(kube_client=client,
                                                 extract_xcom=self.xcom_push)
-            (final_state, result) = launcher.run_pod(
-                pod,
-                startup_timeout=self.startup_timeout_seconds,
-                get_logs=self.get_logs)
-
-            if self.is_delete_operator_pod:
-                launcher.delete_pod(pod)
+            try:
+                (final_state, result) = launcher.run_pod(
+                    pod,
+                    startup_timeout=self.startup_timeout_seconds,
+                    get_logs=self.get_logs)
+            finally:
+                if self.is_delete_operator_pod:
+                    launcher.delete_pod(pod)
 
             if final_state != State.SUCCESS:
                 raise AirflowException(
diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py
index f808a2f47f..e1dfce28c4 100644
--- a/tests/contrib/minikube/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py
@@ -107,6 +107,27 @@ def test_image_pull_secrets_correctly_set(self, client_mock, launcher_mock):
         self.assertEqual(launcher_mock.call_args[0][0].image_pull_secrets,
                          fake_pull_secrets)
 
+    @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod")
+    @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.delete_pod")
+    @mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client")
+    def test_pod_delete_even_on_launcher_error(self, client_mock, delete_pod_mock, run_pod_mock):
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            in_cluster=False,
+            cluster_context='default',
+            is_delete_operator_pod=True
+        )
+        run_pod_mock.side_effect = AirflowException('fake failure')
+        with self.assertRaises(AirflowException):
+            k.execute(None)
+        delete_pod_mock.assert_called()
+
     @staticmethod
     def test_working_pod():
         k = KubernetesPodOperator(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> KubernetesPodOperator does not delete on timeout failure
> --------------------------------------------------------
>
>                 Key: AIRFLOW-3378
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3378
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: kubernetes
>    Affects Versions: 1.10.1
>            Reporter: Victor
>            Assignee: Victor
>            Priority: Major
>             Fix For: 2.0.0
>
>
> I am runnig airflow 1.10.1rc2 on kubernetes with the LocalExecutior and KubernetesPodOperator
to run pods.
>  
> When a failure happens because the pod can't be created (see logs below), the whole operator
fails and if is_delete_operator_pod is set to True, the pod is NOT deleted as expected.
>  
> Logs:
> {noformat}
> *** Reading local file: /airflow/logs/demo/datapipe/2018-11-21T08:29:57.206456+00:00/1.log
> [2018-11-21 08:30:02,027] {models.py:1361} INFO - Dependencies all met for <TaskInstance:
demo.datapipe 2018-11-21T08:29:57.206456+00:00 [queued]>
> [2018-11-21 08:30:02,040] {models.py:1361} INFO - Dependencies all met for <TaskInstance:
demo.datapipe 2018-11-21T08:29:57.206456+00:00 [queued]>
> [2018-11-21 08:30:02,041] {models.py:1573} INFO -
> --------------------------------------------------------------------------------
> Starting attempt 1 of 1
> --------------------------------------------------------------------------------
> [2018-11-21 08:30:02,066] {models.py:1595} INFO - Executing <Task(KubernetesPodOperator):
datapipe> on 2018-11-21T08:29:57.206456+00:00
> [2018-11-21 08:30:02,067] {base_task_runner.py:118} INFO - Running: ['bash', '-c', 'airflow
run demo datapipe 2018-11-21T08:29:57.206456+00:00 --job_id 4 --raw -sd /usr/local/airflow/dags_volume/..data/demo.py
--cfg_path /tmp/tmp5qa5qyhs']
> [2018-11-21 08:30:03,065] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe [2018-11-21
08:30:03,065] {settings.py:174} INFO - setting.configure_orm(): Using pool settings. pool_size=5,
pool_recycle=1800
> [2018-11-21 08:30:03,734] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe [2018-11-21
08:30:03,732] {__init__.py:51} INFO - Using executor LocalExecutor
> [2018-11-21 08:30:04,038] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe [2018-11-21
08:30:04,038] {models.py:271} INFO - Filling up the DagBag from /usr/local/airflow/dags_volume/..data/demo.py
> [2018-11-21 08:30:04,590] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe [2018-11-21
08:30:04,590] {cli.py:484} INFO - Running <TaskInstance: demo.datapipe 2018-11-21T08:29:57.206456+00:00
[running]> on host infra-airflow-6d78c56489-r9trl
> [2018-11-21 08:30:04,684] {logging_mixin.py:95} INFO - [2018-11-21 08:30:04,676] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:05,698] {logging_mixin.py:95} INFO - [2018-11-21 08:30:05,698] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:06,706] {logging_mixin.py:95} INFO - [2018-11-21 08:30:06,706] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:07,715] {logging_mixin.py:95} INFO - [2018-11-21 08:30:07,714] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:08,723] {logging_mixin.py:95} INFO - [2018-11-21 08:30:08,723] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:09,730] {logging_mixin.py:95} INFO - [2018-11-21 08:30:09,730] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:10,736] {logging_mixin.py:95} INFO - [2018-11-21 08:30:10,736] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:11,742] {logging_mixin.py:95} INFO - [2018-11-21 08:30:11,742] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:12,748] {logging_mixin.py:95} INFO - [2018-11-21 08:30:12,748] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:13,754] {logging_mixin.py:95} INFO - [2018-11-21 08:30:13,754] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:14,764] {logging_mixin.py:95} INFO - [2018-11-21 08:30:14,764] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:15,769] {logging_mixin.py:95} INFO - [2018-11-21 08:30:15,769] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:16,777] {logging_mixin.py:95} INFO - [2018-11-21 08:30:16,777] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:17,783] {logging_mixin.py:95} INFO - [2018-11-21 08:30:17,783] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:18,790] {logging_mixin.py:95} INFO - [2018-11-21 08:30:18,790] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:19,796] {logging_mixin.py:95} INFO - [2018-11-21 08:30:19,796] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:20,802] {logging_mixin.py:95} INFO - [2018-11-21 08:30:20,802] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:21,808] {logging_mixin.py:95} INFO - [2018-11-21 08:30:21,808] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:22,815] {logging_mixin.py:95} INFO - [2018-11-21 08:30:22,815] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:23,822] {logging_mixin.py:95} INFO - [2018-11-21 08:30:23,822] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:24,833] {logging_mixin.py:95} INFO - [2018-11-21 08:30:24,833] {pod_launcher.py:121}
INFO - Event: datapipe-a5591f0c had an event of type Pending
> [2018-11-21 08:30:24,841] {models.py:1760} ERROR - Pod Launching failed: Pod took too
long to start
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py",
line 123, in execute
>     get_logs=self.get_logs)
>   File "/usr/local/lib/python3.6/site-packages/airflow/contrib/kubernetes/pod_launcher.py",
line 86, in run_pod
>     raise AirflowException("Pod took too long to start")
> airflow.exceptions.AirflowException: Pod took too long to start
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
>     result = task_copy.execute(context=context)
>   File "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py",
line 135, in execute
>     raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
> airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
> [2018-11-21 08:30:24,844] {models.py:1791} INFO - Marking task as FAILED.
> [2018-11-21 08:30:25,001] {logging_mixin.py:95} INFO - [2018-11-21 08:30:25,001] {email.py:127}
INFO - Sent an alert email to ['victor.noel@xxx.com']
> [2018-11-21 08:30:26,646] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe Traceback
(most recent call last):
> [2018-11-21 08:30:26,646] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe  
File "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py",
line 123, in execute
> [2018-11-21 08:30:26,646] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe    
get_logs=self.get_logs)
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe  
File "/usr/local/lib/python3.6/site-packages/airflow/contrib/kubernetes/pod_launcher.py",
line 86, in run_pod
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe    
raise AirflowException("Pod took too long to start")
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe airflow.exceptions.AirflowException:
Pod took too long to start
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe During
handling of the above exception, another exception occurred:
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe Traceback
(most recent call last):
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe  
File "/usr/local/bin/airflow", line 32, in <module>
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe    
args.func(args)
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe  
File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe    
return f(*args, **kwargs)
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe  
File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 490, in run
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe    
_run(args, dag, ti)
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe  
File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 406, in _run
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe    
pool=args.pool,
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe  
File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe    
return func(*args, **kwargs)
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe  
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe    
result = task_copy.execute(context=context)
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe  
File "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py",
line 135, in execute
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe    
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
> [2018-11-21 08:30:26,647] {base_task_runner.py:101} INFO - Job 4: Subtask datapipe airflow.exceptions.AirflowException:
Pod Launching failed: Pod took too long to start
> [2018-11-21 08:30:27,054] {logging_mixin.py:95} INFO - [2018-11-21 08:30:27,053] {jobs.py:2627}
INFO - Task exited with return code 1
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message