airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject [10/20] incubator-airflow git commit: [AIRFLOW-1314] Add executor_config and tests
Date Sun, 22 Apr 2018 08:32:43 GMT
[AIRFLOW-1314] Add executor_config and tests

* Added in executor_config to the task_instance table and the base_operator table

* Fix test; bump up number of examples

* Fix up comments from PR

* Exclude the kubernetes example dag from a test

* Fix dict -> KubernetesExecutorConfig

* fixed up executor_config comment and type hint


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c0920efc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c0920efc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c0920efc

Branch: refs/heads/master
Commit: c0920efc012468681cff3d3c9cfe25c7381dc976
Parents: ad4e67c
Author: Grant Nicholas <grantnicholas2015@u.northwestern.edu>
Authored: Fri Oct 27 12:13:27 2017 -0500
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Sun Apr 22 10:23:06 2018 +0200

----------------------------------------------------------------------
 .../contrib/executors/kubernetes_executor.py    | 67 +++++++++++++++++---
 airflow/contrib/executors/mesos_executor.py     |  2 +-
 .../kubernetes_request_factory.py               | 21 ++++++
 .../pod_request_factory.py                      |  1 +
 airflow/contrib/kubernetes/pod.py               | 22 ++++++-
 .../contrib/kubernetes/worker_configuration.py  | 16 +++--
 .../example_dags/example_kubernetes_executor.py | 66 +++++++++++++++++++
 airflow/executors/__init__.py                   | 20 ++++--
 airflow/executors/base_executor.py              |  4 +-
 airflow/executors/celery_executor.py            |  2 +-
 airflow/executors/dask_executor.py              |  2 +-
 airflow/executors/local_executor.py             |  4 +-
 airflow/executors/sequential_executor.py        |  2 +-
 ...7c24_add_executor_config_to_task_instance.py | 44 +++++++++++++
 airflow/models.py                               | 10 +++
 scripts/ci/kubernetes/docker/Dockerfile_zip     | 20 ++++++
 scripts/ci/kubernetes/docker/build.sh           |  4 +-
 .../test_kubernetes_executor_integration.py     |  8 +++
 tests/jobs.py                                   |  2 +-
 19 files changed, 288 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index 9675e81..1e3e319 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -19,18 +19,65 @@ import six
 from queue import Queue
 from dateutil import parser
 from uuid import uuid4
+import kubernetes
 from kubernetes import watch, client
 from kubernetes.client.rest import ApiException
 from airflow.contrib.kubernetes.pod_launcher import PodLauncher
 from airflow.contrib.kubernetes.kube_client import get_kube_client
 from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
 from airflow.executors.base_executor import BaseExecutor
+from airflow.executors import Executors
 from airflow.models import TaskInstance, KubeResourceVersion
 from airflow.utils.state import State
 from airflow import configuration, settings
 from airflow.exceptions import AirflowConfigException
+from airflow.contrib.kubernetes.pod import Pod, Resources
 from airflow.utils.log.logging_mixin import LoggingMixin
 
+
+class KubernetesExecutorConfig:
+
+    def __init__(self, image=None, request_memory=None, request_cpu=None, limit_memory=None,
limit_cpu=None):
+        self.image = image
+        self.request_memory = request_memory
+        self.request_cpu = request_cpu
+        self.limit_memory = limit_memory
+        self.limit_cpu = limit_cpu
+
+    def __repr__(self):
+        return "{}(image={}, request_memory={} ,request_cpu={}, limit_memory={}, limit_cpu={})".format(
+            KubernetesExecutorConfig.__name__,
+            self.image, self.request_memory, self.request_cpu, self.limit_memory,self.limit_cpu
+        )
+
+    @staticmethod
+    def from_dict(obj):
+        if obj is None:
+            return KubernetesExecutorConfig()
+
+        if not isinstance(obj, dict):
+            raise TypeError("Cannot convert a non-dictionary object into a KubernetesExecutorConfig")
+
+        namespaced = obj.get(Executors.KubernetesExecutor, {})
+
+        return KubernetesExecutorConfig(
+            image=namespaced.get("image", None),
+            request_memory=namespaced.get("request_memory", None),
+            request_cpu=namespaced.get("request_cpu", None),
+            limit_memory=namespaced.get("limit_memory", None),
+            limit_cpu=namespaced.get("limit_cpu", None)
+        )
+
+    def as_dict(self):
+        return {
+            "image": self.image,
+            "request_memory": self.request_memory,
+            "request_cpu": self.request_cpu,
+            "limit_memory": self.limit_memory,
+            "limit_cpu": self.limit_cpu
+        }
+
+
 class KubeConfig:
     core_section = "core"
     kubernetes_section = "kubernetes"
@@ -219,15 +266,15 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
         :return: 
 
         """
-        self.log.debug('k8s: job is {}'.format(str(next_job)))
-        key, command = next_job
+        self.log.info('k8s: job is {}'.format(str(next_job)))
+        key, command, kube_executor_config = next_job
         dag_id, task_id, execution_date = key
         self.log.debug("k8s: running for command {}".format(command))
         self.log.debug("k8s: launching image {}".format(self.kube_config.kube_image))
         pod = self.worker_configuration.make_pod(
             namespace=self.namespace, pod_id=self._create_pod_id(dag_id, task_id),
             dag_id=dag_id, task_id=task_id, execution_date=self._datetime_to_label_safe_datestring(execution_date),
-            airflow_command=command
+            airflow_command=command, kube_executor_config=kube_executor_config
         )
         # the watcher will monitor pods, so we do not block.
         self.launcher.run_pod_async(pod)
@@ -405,9 +452,13 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         self._inject_secrets()
         self.clear_not_launched_queued_tasks()
 
-    def execute_async(self, key, command, queue=None):
-        self.log.info("k8s: adding task {} with command {}".format(key, command))
-        self.task_queue.put((key, command))
+
+    def execute_async(self, key, command, queue=None, executor_config=None):
+        self.log.info("k8s: adding task {} with command {} with executor_config {}".format(
+            key, command, executor_config
+        ))
+        kube_executor_config = KubernetesExecutorConfig.from_dict(executor_config)
+        self.task_queue.put((key, command, kube_executor_config))
 
     def sync(self):
         self.log.info("self.running: {}".format(self.running))
@@ -425,8 +476,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         KubeResourceVersion.checkpoint_resource_version(last_resource_version, session=self._session)
 
         if not self.task_queue.empty():
-            key, command = self.task_queue.get()
-            self.kube_scheduler.run_next((key, command))
+            key, command, kube_executor_config = self.task_queue.get()
+            self.kube_scheduler.run_next((key, command, kube_executor_config))
 
     def _change_state(self, key, state, pod_id):
         if state != State.RUNNING:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/contrib/executors/mesos_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/mesos_executor.py b/airflow/contrib/executors/mesos_executor.py
index c022f6a..e1919fa 100644
--- a/airflow/contrib/executors/mesos_executor.py
+++ b/airflow/contrib/executors/mesos_executor.py
@@ -285,7 +285,7 @@ class MesosExecutor(BaseExecutor, LoginMixin):
         self.mesos_driver = driver
         self.mesos_driver.start()
 
-    def execute_async(self, key, command, queue=None):
+    def execute_async(self, key, command, queue=None, executor_config=None):
         self.task_queue.put((key, command))
 
     def sync(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 9cfd77f..67ff15c 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -127,6 +127,27 @@ class KubernetesRequestFactoryHelper(object):
             req['spec']['containers'][0]['env'] = env
 
     @staticmethod
+    def extract_resources(pod, req):
+        if not pod.resources or pod.resources.is_empty_resource_request():
+            return
+
+        req['spec']['containers'][0]['resources'] = {}
+
+        if pod.resources.has_requests():
+            req['spec']['containers'][0]['resources']['requests'] = {}
+            if pod.resources.request_memory:
+                req['spec']['containers'][0]['resources']['requests']['memory'] = pod.resources.request_memory
+            if pod.resources.request_cpu:
+                req['spec']['containers'][0]['resources']['requests']['cpu'] = pod.resources.request_cpu
+
+        if pod.resources.has_limits():
+            req['spec']['containers'][0]['resources']['limits'] = {}
+            if pod.resources.request_memory:
+                req['spec']['containers'][0]['resources']['limits']['memory'] = pod.resources.limit_memory
+            if pod.resources.request_cpu:
+                req['spec']['containers'][0]['resources']['limits']['cpu'] = pod.resources.limit_cpu
+
+    @staticmethod
     def extract_init_containers(pod, req):
         if pod.init_containers:
             req['spec']['initContainers'] = pod.init_containers

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index dfa247f..2b1756a 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -52,6 +52,7 @@ spec:
         self.extract_volume_secrets(pod, req)
         self.attach_volumes(pod, req)
         self.attach_volume_mounts(pod, req)
+        self.extract_resources(pod, req)
         self.extract_service_account_name(pod, req)
         self.extract_init_containers(pod, req)
         self.extract_image_pull_secrets(pod, req)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index be99bbf..56a3114 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -13,6 +13,23 @@
 # limitations under the License.
 
 
+class Resources:
+    def __init__(self, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None):
+        self.request_memory = request_memory
+        self.request_cpu = request_cpu
+        self.limit_memory = limit_memory
+        self.limit_cpu = limit_cpu
+
+    def is_empty_resource_request(self):
+        return not self.has_limits() and not self.has_requests()
+
+    def has_limits(self):
+        return self.limit_cpu is not None or self.limit_memory is not None
+
+    def has_requests(self):
+        return self.request_cpu is not None or self.request_memory is not None
+
+
 class Pod:
     """
         Represents a kubernetes pod and manages execution of a single pod.
@@ -46,7 +63,9 @@ class Pod:
             image_pull_policy="IfNotPresent",
             image_pull_secrets=None,
             init_containers=None,
-            service_account_name=None):
+            service_account_name=None,
+            resources=None
+    ):
         self.image = image
         self.envs = envs if envs else {}
         self.cmds = cmds
@@ -61,3 +80,4 @@ class Pod:
         self.image_pull_secrets = image_pull_secrets
         self.init_containers = init_containers
         self.service_account_name = service_account_name
+        self.resources = resources or Resources()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/contrib/kubernetes/worker_configuration.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py
index 5e87941..f59576a 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -16,7 +16,7 @@ import copy
 import os
 import six
 
-from airflow.contrib.kubernetes.pod import Pod
+from airflow.contrib.kubernetes.pod import Pod, Resources
 from airflow.contrib.kubernetes.secret import Secret
 
 
@@ -133,13 +133,20 @@ class WorkerConfiguration:
             return []
         return self.kube_config.image_pull_secrets.split(',')
 
-    def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, airflow_command):
+    def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, airflow_command,
kube_executor_config):
         volumes, volume_mounts = self._get_volumes_and_mounts()
         worker_init_container_spec = self._get_init_containers(copy.deepcopy(volume_mounts))
+        resources = Resources(
+            request_memory=kube_executor_config.request_memory,
+            request_cpu=kube_executor_config.request_cpu,
+            limit_memory=kube_executor_config.limit_memory,
+            limit_cpu=kube_executor_config.limit_cpu
+        )
+
         return Pod(
             namespace=namespace,
             name=pod_id,
-            image=self.kube_config.kube_image,
+            image=kube_executor_config.image or self.kube_config.kube_image,
             cmds=["bash", "-cx", "--"],
             args=[airflow_command],
             labels={
@@ -154,5 +161,6 @@ class WorkerConfiguration:
             image_pull_secrets=self.kube_config.image_pull_secrets,
             init_containers=worker_init_container_spec,
             volumes=volumes,
-            volume_mounts=volume_mounts
+            volume_mounts=volume_mounts,
+            resources=resources
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/example_dags/example_kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_kubernetes_executor.py b/airflow/example_dags/example_kubernetes_executor.py
new file mode 100644
index 0000000..31bc2fb
--- /dev/null
+++ b/airflow/example_dags/example_kubernetes_executor.py
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import print_function
+import airflow
+from airflow.operators.python_operator import PythonOperator
+from airflow.models import DAG
+from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig
+import os
+
+
+args = {
+    'owner': 'airflow',
+    'start_date': airflow.utils.dates.days_ago(2)
+}
+
+dag = DAG(
+    dag_id='example_kubernetes_executor', default_args=args,
+    schedule_interval=None
+)
+
+
+def print_stuff():
+    print("stuff!")
+
+
+def use_zip_binary():
+    rc = os.system("zip")
+    assert rc == 0
+
+
+
+# You don't have to use any special KubernetesExecutor configuration if you don't want to
+start_task = PythonOperator(
+    task_id="start_task", python_callable=print_stuff, dag=dag
+)
+
+# But you can if you want to
+one_task = PythonOperator(
+    task_id="one_task", python_callable=print_stuff, dag=dag,
+    executor_config={"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+)
+
+# Use the zip binary, which is only found in this special docker image
+two_task = PythonOperator(
+    task_id="two_task", python_callable=use_zip_binary, dag=dag,
+    executor_config={"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+)
+
+# Limit resources on this operator/task
+three_task = PythonOperator(
+    task_id="three_task", python_callable=print_stuff, dag=dag,
+    executor_config={"KubernetesExecutor": {"request_memory": "128Mi", "limit_memory": "128Mi"}}
+)
+
+start_task.set_downstream([one_task, two_task, three_task])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/executors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index e8d1c32..047da6f 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -50,6 +50,14 @@ def GetDefaultExecutor():
     return DEFAULT_EXECUTOR
 
 
+class Executors:
+    LocalExecutor = "LocalExecutor"
+    SequentialExecutor = "SequentialExecutor"
+    CeleryExecutor = "CeleryExecutor"
+    DaskExecutor = "DaskExecutor"
+    MesosExecutor = "MesosExecutor"
+    KubernetesExecutor = "KubernetesExecutor"
+
 
 
 def _get_executor(executor_name):
@@ -57,20 +65,20 @@ def _get_executor(executor_name):
     Creates a new instance of the named executor. In case the executor name is not know in
airflow,
     look for it in the plugins
     """
-    if executor_name == 'LocalExecutor':
+    if executor_name == Executors.LocalExecutor:
         return LocalExecutor()
-    elif executor_name == 'SequentialExecutor':
+    elif executor_name == Executors.SequentialExecutor:
         return SequentialExecutor()
-    elif executor_name == 'CeleryExecutor':
+    elif executor_name == Executors.CeleryExecutor:
         from airflow.executors.celery_executor import CeleryExecutor
         return CeleryExecutor()
-    elif executor_name == 'DaskExecutor':
+    elif executor_name == Executors.DaskExecutor:
         from airflow.executors.dask_executor import DaskExecutor
         return DaskExecutor()
-    elif executor_name == 'MesosExecutor':
+    elif executor_name == Executors.MesosExecutor:
         from airflow.contrib.executors.mesos_executor import MesosExecutor
         return MesosExecutor()
-    elif executor_name == 'KubernetesExecutor':
+    elif executor_name == Executors.KubernetesExecutor:
         from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor
         return KubernetesExecutor()
     else:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 4515dac..1ff4c21 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -133,7 +133,7 @@ class BaseExecutor(LoggingMixin):
             ti.refresh_from_db()
             if ti.state != State.RUNNING:
                 self.running[key] = command
-                self.execute_async(key, command=command, queue=queue)
+                self.execute_async(key, command=command, queue=queue, executor_config=ti.executor_config)
             else:
                 self.logger.info(
                     'Task is already running, not sending to '
@@ -174,7 +174,7 @@ class BaseExecutor(LoggingMixin):
 
         return cleared_events
 
-    def execute_async(self, key, command, queue=None):  # pragma: no cover
+    def execute_async(self, key, command, queue=None, executor_config=None):  # pragma: no
cover
         """
         This method will execute the command asynchronously.
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index efabca5..70d0088 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -79,7 +79,7 @@ class CeleryExecutor(BaseExecutor):
         self.last_state = {}
 
     def execute_async(self, key, command,
-                      queue=DEFAULT_CELERY_CONFIG['task_default_queue']):
+                      queue=DEFAULT_CELERY_CONFIG['task_default_queue'], executor_config=None):
         self.log.info( "[celery] queuing {key} through celery, "
                        "queue={queue}".format(**locals()))
         self.tasks[key] = execute_command.apply_async(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py
index 17ace55..42716ee 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -56,7 +56,7 @@ class DaskExecutor(BaseExecutor):
         self.client = distributed.Client(self.cluster_address, security=security)
         self.futures = {}
 
-    def execute_async(self, key, command, queue=None):
+    def execute_async(self, key, command, queue=None, executor_config=None):
         if queue is not None:
             warnings.warn(
                 'DaskExecutor does not support queues. All tasks will be run in the same
cluster'

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/executors/local_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index ed03980..4ac25f5 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -221,8 +221,8 @@ class LocalExecutor(BaseExecutor):
 
         self.impl.start()
 
-    def execute_async(self, key, command, queue=None):
-        self.impl.execute_async(key=key, command=command)
+    def execute_async(self, key, command, queue=None, executor_config=None):
+        self.queue.put((key, command))
 
     def sync(self):
         self.impl.sync()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/executors/sequential_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py
index ed27109..39153b8 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -37,7 +37,7 @@ class SequentialExecutor(BaseExecutor):
         super(SequentialExecutor, self).__init__()
         self.commands_to_run = []
 
-    def execute_async(self, key, command, queue=None):
+    def execute_async(self, key, command, queue=None, executor_config=None):
         self.commands_to_run.append((key, command,))
 
     def sync(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
b/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
new file mode 100644
index 0000000..84c41ec
--- /dev/null
+++ b/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
@@ -0,0 +1,44 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""kubernetes_resource_checkpointing
+
+Revision ID: 33ae817a1ff4
+Revises: 947454bf1dff
+Create Date: 2017-09-11 15:26:47.598494
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '27c6a30d7c24'
+down_revision = '33ae817a1ff4'
+branch_labels = None
+depends_on = None
+
+
+from alembic import op
+import sqlalchemy as sa
+import dill
+
+
+TASK_INSTANCE_TABLE = "task_instance"
+NEW_COLUMN = "executor_config"
+
+
+def upgrade():
+    op.add_column(TASK_INSTANCE_TABLE, sa.Column(NEW_COLUMN, sa.PickleType(pickler=dill)))
+
+
+def downgrade():
+    op.drop_column(TASK_INSTANCE_TABLE, NEW_COLUMN)
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index d03c363..ae387b6 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -824,6 +824,7 @@ class TaskInstance(Base, LoggingMixin):
     operator = Column(String(1000))
     queued_dttm = Column(UtcDateTime)
     pid = Column(Integer)
+    executor_config = Column(PickleType(pickler=dill))
 
     __table_args__ = (
         Index('ti_dag_state', dag_id, state),
@@ -862,6 +863,7 @@ class TaskInstance(Base, LoggingMixin):
         if state:
             self.state = state
         self.hostname = ''
+        self.executor_config = task.executor_config
         self.init_on_load()
         # Is this TaskInstance being currently running within `airflow run --raw`.
         # Not persisted to the database so only valid for the current process
@@ -1147,6 +1149,7 @@ class TaskInstance(Base, LoggingMixin):
             self.max_tries = ti.max_tries
             self.hostname = ti.hostname
             self.pid = ti.pid
+            self.executor_config = ti.executor_config
         else:
             self.state = None
 
@@ -2220,6 +2223,11 @@ class BaseOperator(LoggingMixin):
     :param task_concurrency: When set, a task will be able to limit the concurrent
         runs across execution_dates
     :type task_concurrency: int
+    :param executor_config: Additional task-level configuration parameters that are 
+        interpreted by a specific executor. Parameters are namespaced by the name of executor.
+        ``example: to run this task in a specific docker container through the KubernetesExecutor
+        MyOperator(..., executor_config={"KubernetesExecutor": {"image": "myCustomDockerImage"}})``
+    :type executor_config: dict
     """
 
     # For derived classes to define which fields will get jinjaified
@@ -2264,6 +2272,7 @@ class BaseOperator(LoggingMixin):
             resources=None,
             run_as_user=None,
             task_concurrency=None,
+            executor_config=None,
             *args,
             **kwargs):
 
@@ -2338,6 +2347,7 @@ class BaseOperator(LoggingMixin):
         self.resources = Resources(**(resources or {}))
         self.run_as_user = run_as_user
         self.task_concurrency = task_concurrency
+        self.executor_config = executor_config or {}
 
         # Private attributes
         self._upstream_task_ids = set()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/scripts/ci/kubernetes/docker/Dockerfile_zip
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/Dockerfile_zip b/scripts/ci/kubernetes/docker/Dockerfile_zip
new file mode 100644
index 0000000..494b16a
--- /dev/null
+++ b/scripts/ci/kubernetes/docker/Dockerfile_zip
@@ -0,0 +1,20 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+
+FROM airflow/ci:latest
+
+RUN apt-get -y update && apt-get -y install zip unzip

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/scripts/ci/kubernetes/docker/build.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/build.sh b/scripts/ci/kubernetes/docker/build.sh
index d36ea86..7f0fe61 100755
--- a/scripts/ci/kubernetes/docker/build.sh
+++ b/scripts/ci/kubernetes/docker/build.sh
@@ -26,4 +26,6 @@ if [ $? -eq 0 ]; then
 fi
 
 cd $AIRFLOW_ROOT && python setup.py sdist && cp $AIRFLOW_ROOT/dist/*.tar.gz
$DIRNAME/airflow.tar.gz && \
-cd $DIRNAME && docker build $DIRNAME --tag=${IMAGE}:${TAG}
+cd $DIRNAME && \
+docker build -f Dockerfile $DIRNAME --tag=${IMAGE}:${TAG} && \
+docker build -f Dockerfile_zip $DIRNAME --tag=${IMAGE}_zip:${TAG}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/tests/contrib/executors/integration/test_kubernetes_executor_integration.py
----------------------------------------------------------------------
diff --git a/tests/contrib/executors/integration/test_kubernetes_executor_integration.py b/tests/contrib/executors/integration/test_kubernetes_executor_integration.py
index 709ae6a..97949ae 100644
--- a/tests/contrib/executors/integration/test_kubernetes_executor_integration.py
+++ b/tests/contrib/executors/integration/test_kubernetes_executor_integration.py
@@ -52,6 +52,14 @@ class KubernetesExecutorTest(unittest.TestCase):
 
         self.assertEquals(dag_final_state(dag_id, run_id, timeout=180), DagRunState.SUCCESS)
 
+    @unittest.skipIf(SKIP_KUBE, 'Kubernetes integration tests are unsupported by this configuration')
+    def test_kubernetes_executor_config_works(self):
+        dag_id, run_id = "example_kubernetes_executor", uuid4().hex
+        run_dag(dag_id, run_id)
+
+        self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING)
+        self.assertEquals(dag_final_state(dag_id, run_id, timeout=180), DagRunState.SUCCESS)
+
 
 if __name__ == "__main__":
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index d6336f0..615ca9a 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -161,7 +161,7 @@ class BackfillJobTest(unittest.TestCase):
             'example_trigger_target_dag',
             'example_trigger_controller_dag',  # tested above
             'test_utils',  # sleeps forever
-            'example_kubernetes_operator',  # only works with k8s cluster
+            'example_kubernetes_executor'  # requires kubernetes cluster
         ]
 
         logger = logging.getLogger('BackfillJobTest.test_backfill_examples')


Mime
View raw message