airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject [18/20] incubator-airflow git commit: [AIRFLOW-1999] Add per-task GCP service account support
Date Sun, 22 Apr 2018 08:32:51 GMT
[AIRFLOW-1999] Add per-task GCP service account support


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cdb43cb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cdb43cb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cdb43cb8

Branch: refs/heads/master
Commit: cdb43cb87c9bd6db1cb0317cef861faf9c7b0e86
Parents: b9a87a0
Author: fenglu-g <fenglu@google.com>
Authored: Wed Mar 21 01:24:00 2018 -0400
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Sun Apr 22 10:23:06 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/executors/kubernetes_executor.py      | 14 +++++++++-----
 .../kubernetes_request_factory.py                     |  6 ++++++
 .../kubernetes_request_factory/pod_request_factory.py |  1 +
 airflow/contrib/kubernetes/pod.py                     |  4 +++-
 airflow/contrib/kubernetes/worker_configuration.py    |  7 ++++++-
 5 files changed, 25 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdb43cb8/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index 49993a8..b497387 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -37,23 +37,25 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 
 class KubernetesExecutorConfig:
     def __init__(self, image=None, request_memory=None, request_cpu=None,
-                 limit_memory=None, limit_cpu=None):
+                 limit_memory=None, limit_cpu=None, gcp_service_account_key=None):
         self.image = image
         self.request_memory = request_memory
         self.request_cpu = request_cpu
         self.limit_memory = limit_memory
         self.limit_cpu = limit_cpu
+        self.gcp_service_account_key = gcp_service_account_key
 
     def __repr__(self):
         return "{}(image={}, request_memory={} ,request_cpu={}, limit_memory={}, " \
-               "limit_cpu={})"\
+            "limit_cpu={}, gcp_service_account_key={})"\
             .format(
                 KubernetesExecutorConfig.__name__,
                 self.image,
                 self.request_memory,
                 self.request_cpu,
                 self.limit_memory,
-                self.limit_cpu
+                self.limit_cpu,
+                self.gcp_service_account_key
             )
 
     @staticmethod
@@ -72,7 +74,8 @@ class KubernetesExecutorConfig:
             request_memory=namespaced.get("request_memory", None),
             request_cpu=namespaced.get("request_cpu", None),
             limit_memory=namespaced.get("limit_memory", None),
-            limit_cpu=namespaced.get("limit_cpu", None)
+            limit_cpu=namespaced.get("limit_cpu", None),
+            gcp_service_account_key=namespaced.get("gcp_service_account_key", None)
         )
 
     def as_dict(self):
@@ -81,7 +84,8 @@ class KubernetesExecutorConfig:
             "request_memory": self.request_memory,
             "request_cpu": self.request_cpu,
             "limit_memory": self.limit_memory,
-            "limit_cpu": self.limit_cpu
+            "limit_cpu": self.limit_cpu,
+            "gcp_service_account_key": self.gcp_service_account_key
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdb43cb8/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index cbf3fce..6e8632f 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -64,6 +64,12 @@ class KubernetesRequestFactory:
             req['metadata']['labels'][k] = v
 
     @staticmethod
+    def extract_annotations(pod, req):
+        req['metadata']['annotations'] = req['metadata'].get('annotations', {})
+        for k, v in six.iteritems(pod.annotations):
+            req['metadata']['annotations'][k] = v
+
+    @staticmethod
     def extract_cmds(pod, req):
         req['spec']['containers'][0]['command'] = pod.cmds
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdb43cb8/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 44b05dd..106a6be 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -57,4 +57,5 @@ spec:
         self.extract_service_account_name(pod, req)
         self.extract_init_containers(pod, req)
         self.extract_image_pull_secrets(pod, req)
+        self.extract_annotations(pod, req)
         return req

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdb43cb8/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index 01d6760..8dbc947 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -72,7 +72,8 @@ class Pod:
             image_pull_secrets=None,
             init_containers=None,
             service_account_name=None,
-            resources=None
+            resources=None,
+            annotations=None
     ):
         self.image = image
         self.envs = envs or {}
@@ -91,3 +92,4 @@ class Pod:
         self.init_containers = init_containers
         self.service_account_name = service_account_name
         self.resources = resources or Resources()
+        self.annotations = annotations or {}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdb43cb8/airflow/contrib/kubernetes/worker_configuration.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py
index 5cb92ef..988f4a5 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -150,6 +150,10 @@ class WorkerConfiguration:
             limit_memory=kube_executor_config.limit_memory,
             limit_cpu=kube_executor_config.limit_cpu
         )
+        gcp_sa_key = kube_executor_config.gcp_service_account_key
+        annotations = {
+            "iam.cloud.google.com/service-account": gcp_sa_key
+        } if gcp_sa_key else {}
 
         return Pod(
             namespace=namespace,
@@ -170,5 +174,6 @@ class WorkerConfiguration:
             init_containers=worker_init_container_spec,
             volumes=volumes,
             volume_mounts=volume_mounts,
-            resources=resources
+            resources=resources,
+            annotations=annotations
         )


Mime
View raw message