airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject [11/20] incubator-airflow git commit: [AIRFLOW-1314] Improve error handling
Date Sun, 22 Apr 2018 08:32:44 GMT
[AIRFLOW-1314] Improve error handling

Handle too old resource versions and throw exceptions on errors

- K8s API errors will now throw Airflow exceptions
- Add scheduler uuid to worker pod labels to match the two


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/317b6c7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/317b6c7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/317b6c7b

Branch: refs/heads/master
Commit: 317b6c7bd54099ac3f38ce895fd6ec571635312c
Parents: cdb43cb
Author: Jordan Zucker <jordan.zucker@gmail.com>
Authored: Tue Apr 10 21:56:14 2018 -0700
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Sun Apr 22 10:23:06 2018 +0200

----------------------------------------------------------------------
 .../contrib/executors/kubernetes_executor.py    | 58 +++++++++++++++-----
 .../pod_request_factory.py                      |  2 +-
 airflow/contrib/kubernetes/pod_generator.py     |  7 ++-
 .../contrib/kubernetes/worker_configuration.py  |  4 +-
 ...215c0_add_kubernetes_scheduler_uniqueness.py | 49 +++++++++++++++++
 airflow/models.py                               | 34 ++++++++++++
 .../ci/kubernetes/kube/airflow.yaml.template    | 17 +++++-
 .../ci/kubernetes/minikube/start_minikube.sh    |  2 +-
 8 files changed, 150 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index b497387..1a50d85 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -28,10 +28,10 @@ from airflow.contrib.kubernetes.kube_client import get_kube_client
 from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
 from airflow.executors.base_executor import BaseExecutor
 from airflow.executors import Executors
-from airflow.models import TaskInstance, KubeResourceVersion
+from airflow.models import TaskInstance, KubeResourceVersion, KubeWorkerIdentifier
 from airflow.utils.state import State
 from airflow import configuration, settings
-from airflow.exceptions import AirflowConfigException
+from airflow.exceptions import AirflowConfigException, AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 
@@ -197,9 +197,10 @@ class KubeConfig:
 
 
 class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
-    def __init__(self, namespace, watcher_queue, resource_version):
+    def __init__(self, namespace, watcher_queue, resource_version, worker_uuid):
         multiprocessing.Process.__init__(self)
         self.namespace = namespace
+        self.worker_uuid = worker_uuid
         self.watcher_queue = watcher_queue
         self.resource_version = resource_version
 
@@ -207,7 +208,8 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
         kube_client = get_kube_client()
         while True:
             try:
-                self.resource_version = self._run(kube_client, self.resource_version)
+                self.resource_version = self._run(kube_client, self.resource_version,
+                                                  self.worker_uuid)
             except Exception:
                 self.log.exception("Unknown error in KubernetesJobWatcher. Failing")
                 raise
@@ -215,13 +217,13 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
                 self.log.warn("Watch died gracefully, starting back up with: "
                               "last resource_version: {}".format(self.resource_version))
 
-    def _run(self, kube_client, resource_version):
+    def _run(self, kube_client, resource_version, worker_uuid):
         self.log.info(
             "Event: and now my watch begins starting at resource_version: {}"
                 .format(resource_version))
         watcher = watch.Watch()
 
-        kwargs = {"label_selector": "airflow-slave"}
+        kwargs = {"label_selector": "airflow-worker={}".format(worker_uuid)}
         if resource_version:
             kwargs["resource_version"] = resource_version
 
@@ -232,6 +234,8 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
             self.log.info(
                 "Event: {} had an event of type {}".format(task.metadata.name,
                                                            event['type']))
+            if event['type'] == 'ERROR':
+                return self.process_error(event)
             self.process_status(
                 task.metadata.name, task.status.phase, task.metadata.labels,
                 task.metadata.resource_version
@@ -240,6 +244,19 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
 
         return last_resource_version
 
+    def process_error(self, event):
+        self.log.error("Encountered Error response from k8s list namespaced pod "
+                       "stream => {}".format(event))
+        raw_object = event['raw_object']
+        if raw_object['code'] == 410:
+            self.log.info('Kubernetes resource version is too old, must '
+                          'reset to 0 => {}'.format(raw_object['message']))
+            # Return resource version 0
+            return '0'
+        raise AirflowException(
+            'Kubernetes failure for {} with code {} and message: {}'
+            .format(raw_object['reason'], raw_object['code'], raw_object['message']))
+
     def process_status(self, pod_id, status, labels, resource_version):
         if status == 'Pending':
             self.log.info("Event: {} Pending".format(pod_id))
@@ -258,7 +275,8 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
 
 
 class AirflowKubernetesScheduler(LoggingMixin, object):
-    def __init__(self, kube_config, task_queue, result_queue, session, kube_client):
+    def __init__(self, kube_config, task_queue, result_queue,
+                 session, kube_client, worker_uuid):
         self.log.debug("creating kubernetes executor")
         self.kube_config = kube_config
         self.task_queue = task_queue
@@ -270,12 +288,13 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
         self.worker_configuration = WorkerConfiguration(kube_config=self.kube_config)
         self.watcher_queue = multiprocessing.Queue()
         self._session = session
+        self.worker_uuid = worker_uuid
         self.kube_watcher = self._make_kube_watcher()
 
     def _make_kube_watcher(self):
         resource_version = KubeResourceVersion.get_current_resource_version(self._session)
         watcher = KubernetesJobWatcher(self.namespace, self.watcher_queue,
-                                       resource_version)
+                                       resource_version, self.worker_uuid)
         watcher.start()
         return watcher
 
@@ -302,7 +321,8 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
         self.log.debug("k8s: running for command {}".format(command))
         self.log.debug("k8s: launching image {}".format(self.kube_config.kube_image))
         pod = self.worker_configuration.make_pod(
-            namespace=self.namespace, pod_id=self._create_pod_id(dag_id, task_id),
+            namespace=self.namespace, worker_uuid=self.worker_uuid,
+            pod_id=self._create_pod_id(dag_id, task_id),
             dag_id=dag_id, task_id=task_id,
             execution_date=self._datetime_to_label_safe_datestring(execution_date),
             airflow_command=command, kube_executor_config=kube_executor_config
@@ -436,6 +456,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         self.result_queue = None
         self.kube_scheduler = None
         self.kube_client = None
+        self.worker_uuid = None
         super(KubernetesExecutor, self).__init__(parallelism=self.kube_config.parallelism)
 
     def clear_not_launched_queued_tasks(self):
@@ -461,11 +482,11 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
                 len(queued_tasks)))
 
         for t in queued_tasks:
-            kwargs = dict(label_selector="dag_id={},task_id={},execution_date={}".format(
-                t.dag_id, t.task_id,
-                AirflowKubernetesScheduler._datetime_to_label_safe_datestring(
-                    t.execution_date)
-            ))
+            dict_string = "dag_id={},task_id={},execution_date={},airflow-worker={}"\
+                .format(t.dag_id, t.task_id,
+                        AirflowKubernetesScheduler._datetime_to_label_safe_datestring(
+                            t.execution_date), self.worker_uuid)
+            kwargs = dict(label_selector=dict_string)
             pod_list = self.kube_client.list_namespaced_pod(
                 self.kube_config.kube_namespace, **kwargs)
             if len(pod_list.items) == 0:
@@ -516,12 +537,19 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
     def start(self):
         self.log.info('k8s: starting kubernetes executor')
         self._session = settings.Session()
+        self.worker_uuid = KubeWorkerIdentifier.get_or_create_current_kube_worker_uuid(
+            self._session)
+        self.log.debug('k8s: starting with worker_uuid: {}'.format(self.worker_uuid))
+        # always need to reset resource version since we don't know
+        # when we last started, note for behavior below
+        # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#list_namespaced_pod
+        KubeResourceVersion.reset_resource_version(self._session)
         self.task_queue = Queue()
         self.result_queue = Queue()
         self.kube_client = get_kube_client()
         self.kube_scheduler = AirflowKubernetesScheduler(
             self.kube_config, self.task_queue, self.result_queue, self._session,
-            self.kube_client
+            self.kube_client, self.worker_uuid
         )
         self._inject_secrets()
         self.clear_not_launched_queued_tasks()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 106a6be..0f06d49 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -31,7 +31,7 @@ metadata:
 spec:
   containers:
     - name: base
-      image: airflow-slave:latest
+      image: airflow-worker:latest
       command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"]
   restartPolicy: Never
     """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py
index cf85092..82ac7c4 100644
--- a/airflow/contrib/kubernetes/pod_generator.py
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -201,9 +201,9 @@ class WorkerGenerator(PodGenerator):
             volumes[0]["emptyDir"] = {}
         return volumes, volume_mounts
 
-    def _init_labels(self, dag_id, task_id, execution_date):
+    def _init_labels(self, dag_id, task_id, execution_date, worker_uuid):
         return {
-            "airflow-slave": "",
+            "airflow-worker": worker_uuid,
             "dag_id": dag_id,
             "task_id": task_id,
             "execution_date": execution_date
@@ -264,6 +264,7 @@ class WorkerGenerator(PodGenerator):
 
     def make_worker_pod(self,
                         namespace,
+                        worker_uuid,
                         pod_id,
                         dag_id,
                         task_id,
@@ -271,7 +272,7 @@ class WorkerGenerator(PodGenerator):
                         airflow_command,
                         kube_executor_config):
         cmds = ["bash", "-cx", "--"]
-        labels = self._init_labels(dag_id, task_id, execution_date)
+        labels = self._init_labels(dag_id, task_id, execution_date, worker_uuid)
         PodGenerator.make_pod(self,
                               namespace=namespace,
                               pod_id=pod_id,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/airflow/contrib/kubernetes/worker_configuration.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py
index 988f4a5..e97f5a3 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -139,7 +139,7 @@ class WorkerConfiguration:
             return []
         return self.kube_config.image_pull_secrets.split(',')
 
-    def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date,
+    def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_date,
                  airflow_command, kube_executor_config):
         volumes, volume_mounts = self._get_volumes_and_mounts()
         worker_init_container_spec = self._get_init_containers(
@@ -162,7 +162,7 @@ class WorkerConfiguration:
             cmds=["bash", "-cx", "--"],
             args=[airflow_command],
             labels={
-                "airflow-slave": "",
+                "airflow-worker": worker_uuid,
                 "dag_id": dag_id,
                 "task_id": task_id,
                 "execution_date": execution_date

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
new file mode 100644
index 0000000..6bc48f1
--- /dev/null
+++ b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
@@ -0,0 +1,49 @@
+# flake8: noqa
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""add kubernetes scheduler uniqueness
+
+Revision ID: 86770d1215c0
+Revises: 27c6a30d7c24
+Create Date: 2018-04-03 15:31:20.814328
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '86770d1215c0'
+down_revision = '27c6a30d7c24'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+
+
+RESOURCE_TABLE = "kube_worker_uuid"
+
+
+def upgrade():
+    table = op.create_table(
+        RESOURCE_TABLE,
+        sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), primary_key=True),
+        sa.Column("worker_uuid", sa.String(255)),
+        sa.CheckConstraint("one_row_id", name="kube_worker_one_row_id")
+    )
+    op.bulk_insert(table, [
+        {"worker_uuid": ""}
+    ])
+
+
+def downgrade():
+    op.drop_table(RESOURCE_TABLE)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 2de1ade..18e9e26 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -50,6 +50,7 @@ import traceback
 import warnings
 import hashlib
 
+import uuid
 from datetime import datetime
 from urllib.parse import urlparse, quote
 
@@ -5137,3 +5138,36 @@ class KubeResourceVersion(Base):
                 KubeResourceVersion.resource_version: resource_version
             })
             session.commit()
+
+    @staticmethod
+    @provide_session
+    def reset_resource_version(session=None):
+        session.query(KubeResourceVersion).update({
+            KubeResourceVersion.resource_version: '0'
+        })
+        session.commit()
+        return '0'
+
+
+class KubeWorkerIdentifier(Base):
+    __tablename__ = "kube_worker_uuid"
+    one_row_id = Column(Boolean, server_default=sqltrue(), primary_key=True)
+    worker_uuid = Column(String(255))
+
+    @staticmethod
+    @provide_session
+    def get_or_create_current_kube_worker_uuid(session=None):
+        (worker_uuid,) = session.query(KubeWorkerIdentifier.worker_uuid).one()
+        if worker_uuid == '':
+            worker_uuid = str(uuid.uuid4())
+            KubeWorkerIdentifier.checkpoint_kube_worker_uuid(worker_uuid, session)
+        return worker_uuid
+
+    @staticmethod
+    @provide_session
+    def checkpoint_kube_worker_uuid(worker_uuid, session=None):
+        if worker_uuid:
+            session.query(KubeWorkerIdentifier).update({
+                KubeWorkerIdentifier.worker_uuid: worker_uuid
+            })
+            session.commit()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/scripts/ci/kubernetes/kube/airflow.yaml.template
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/airflow.yaml.template b/scripts/ci/kubernetes/kube/airflow.yaml.template
index ae00983..e16cc08 100644
--- a/scripts/ci/kubernetes/kube/airflow.yaml.template
+++ b/scripts/ci/kubernetes/kube/airflow.yaml.template
@@ -42,6 +42,21 @@ spec:
     requests:
       storage: 10Gi
 ---
+apiVersion: rbac.authorization.k8s.io/v1beta1
+kind: ClusterRoleBinding
+metadata:
+  name: admin-rbac
+subjects:
+  - kind: ServiceAccount
+    # Reference to upper's `metadata.name`
+    name: default
+    # Reference to upper's `metadata.namespace`
+    namespace: default
+roleRef:
+  kind: ClusterRole
+  name: cluster-admin
+  apiGroup: rbac.authorization.k8s.io
+---
 apiVersion: extensions/v1beta1
 kind: Deployment
 metadata:
@@ -173,7 +188,7 @@ data:
     dags_folder = /root/airflow/dags
     base_log_folder = /root/airflow/logs
     logging_level = INFO
-    executor = KubernetesExecutor 
+    executor = KubernetesExecutor
     parallelism = 32
     plugins_folder = /root/airflow/plugins
     sql_alchemy_conn = $SQL_ALCHEMY_CONN

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
index be370cf..525529b 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -23,7 +23,7 @@ if [ $? -eq 0 ]; then
 fi
 #
 
-curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.24.1/minikube-linux-amd64
&& chmod +x minikube
+curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.26.0/minikube-linux-amd64
&& chmod +x minikube
 curl -Lo kubectl  https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl
&& chmod +x kubectl
 
 sudo mkdir -p /usr/local/bin


Mime
View raw message