airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject [09/20] incubator-airflow git commit: [AIRFLOW-1314] Use VolumeClaim for transporting DAGs
Date Sun, 22 Apr 2018 08:32:42 GMT
[AIRFLOW-1314] Use VolumeClaim for transporting DAGs

- fix issue where watcher process randomly dies
- fixed alembic head, was pointing to two tips


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a9d90dc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a9d90dc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a9d90dc9

Branch: refs/heads/master
Commit: a9d90dc9a5bb251e1490390b6dfba309ecef48a8
Parents: 29daa58
Author: grantnicholas <grant.nicholas@nielsen.com>
Authored: Thu Sep 7 12:04:36 2017 -0500
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Sun Apr 22 10:22:44 2018 +0200

----------------------------------------------------------------------
 .gitignore                                      |   4 +-
 .travis.yml                                     |  38 +-
 .../contrib/executors/kubernetes_executor.py    | 369 ++++++++++++++-----
 airflow/contrib/kubernetes/kube_client.py       |  38 +-
 .../kubernetes_request_factory/__init__.py      |   3 -
 .../pod_request_factory.py                      |   3 +-
 airflow/contrib/kubernetes/pod_launcher.py      |  15 +-
 airflow/executors/base_executor.py              |   5 +-
 ...ff4_add_kubernetes_resource_checkpointing.py |  50 +++
 airflow/models.py                               |  45 ++-
 airflow/plugins_manager.py                      |   1 -
 docker/Dockerfile                               |  40 --
 docker/bootstrap.sh                             |  13 -
 docker/build.sh                                 |  12 -
 kube/airflow.yaml.template                      | 103 ------
 kube/deploy.sh                                  |   6 -
 kube/postgres.yaml                              |  94 -----
 scripts/ci/kubernetes/docker/Dockerfile         |  53 +++
 scripts/ci/kubernetes/docker/bootstrap.sh       |  29 ++
 scripts/ci/kubernetes/docker/build.sh           |  29 ++
 .../ci/kubernetes/kube/airflow.yaml.template    | 165 +++++++++
 scripts/ci/kubernetes/kube/deploy.sh            |  42 +++
 scripts/ci/kubernetes/kube/postgres.yaml        | 111 ++++++
 .../ci/kubernetes/minikube/start_minikube.sh    |  88 ++---
 scripts/ci/kubernetes/setup_kubernetes.sh       |   2 +
 scripts/ci/run_tests.sh                         |  10 +-
 scripts/ci/travis_script.sh                     |   6 +-
 tests/contrib/__init__.py                       |   1 -
 tests/contrib/executors/__init__.py             |  13 +
 tests/contrib/executors/integration/__init__.py |  13 +
 .../executors/integration/airflow_controller.py | 114 ++++++
 .../test_kubernetes_executor_integration.py     |  57 +++
 .../executors/test_kubernetes_executor.py       |  71 ++++
 tst.txt                                         |   0
 34 files changed, 1134 insertions(+), 509 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 46df849..638ab19 100644
--- a/.gitignore
+++ b/.gitignore
@@ -134,5 +134,5 @@ rat-results.txt
 # Git stuff
 .gitattributes
 # Kubernetes generated templated files
-kube/.generated/
-airflow.tar.gz
+*.generated
+*.tar.gz

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 883473d..ec2d44d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -51,15 +51,20 @@ env:
     # does not work with python 3
     - BOTO_CONFIG=/tmp/bogusvalue
   matrix:
-    - TOX_ENV=py27-backend_mysql
-    - TOX_ENV=py27-backend_sqlite
-    - TOX_ENV=py27-backend_postgres
-    - TOX_ENV=py35-backend_mysql
-    - TOX_ENV=py35-backend_sqlite
-    - TOX_ENV=py35-backend_postgres
-    - TOX_ENV=flake8
-    - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
-    - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
+    - TOX_ENV=py27-cdh-airflow_backend_mysql
+    - TOX_ENV=py27-cdh-airflow_backend_sqlite
+    - TOX_ENV=py27-cdh-airflow_backend_postgres
+#    - TOX_ENV=py27-hdp-airflow_backend_mysql
+#    - TOX_ENV=py27-hdp-airflow_backend_sqlite
+#    - TOX_ENV=py27-hdp-airflow_backend_postgres
+    - TOX_ENV=py34-cdh-airflow_backend_mysql
+    - TOX_ENV=py34-cdh-airflow_backend_sqlite
+    - TOX_ENV=py34-cdh-airflow_backend_postgres
+#    - TOX_ENV=py34-hdp-airflow_backend_mysql
+#    - TOX_ENV=py34-hdp-airflow_backend_sqlite
+#    - TOX_ENV=py34-hdp-airflow_backend_postgres
+    # Run integration tests on minikube for the KubernetesExecutor
+    - TOX_ENV=py27-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true
 matrix:
   exclude:
     - python: "3.5"
@@ -75,14 +80,15 @@ matrix:
     - python: "2.7"
       env: TOX_ENV=py35-backend_postgres
     - python: "2.7"
-      env: TOX_ENV=flake8
-    - python: "3.5"
-      env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
-    - python: "3.5"
-      env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
+      env: TOX_ENV=py34-hdp-airflow_backend_mysql
+    - python: "2.7"
+      env: TOX_ENV=py34-hdp-airflow_backend_sqlite
+    - python: "2.7"
+      env: TOX_ENV=py34-hdp-airflow_backend_postgres
+    - python: "3.4"
+      env: TOX_ENV=py27-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true
   allow_failures:
-    - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
-    - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
+    - env: TOX_ENV=py27-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true
 cache:
   directories:
     - $HOME/.wheelhouse/

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index 63aa696..8989add 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -14,64 +14,188 @@
 
 import calendar
 import logging
-import time
 import os
 import multiprocessing
 from queue import Queue
-from datetime import datetime
-from kubernetes import watch
-from airflow import settings
+from dateutil import parser
+from uuid import uuid4
+from kubernetes import watch, client
+from kubernetes.client.rest import ApiException
 from airflow.contrib.kubernetes.pod_launcher import PodLauncher
+from airflow.contrib.kubernetes.kube_client import get_kube_client
 from airflow.executors.base_executor import BaseExecutor
-from airflow.models import TaskInstance
-from airflow.contrib.kubernetes.pod import Pod
+from airflow.models import TaskInstance, KubeResourceVersion
 from airflow.utils.state import State
-from airflow import configuration
-from kubernetes import client
+from airflow import configuration, settings
+from airflow.exceptions import AirflowConfigException
+from airflow.contrib.kubernetes.pod import Pod
+
 
 class KubeConfig:
+    core_section = "core"
+    kubernetes_section = "kubernetes"
+
+    @staticmethod
+    def safe_get(section, option, default):
+        try:
+            return configuration.get(section, option)
+        except AirflowConfigException:
+            return default
+
+    @staticmethod
+    def safe_getboolean(section, option, default):
+        try:
+            return configuration.getboolean(section, option)
+        except AirflowConfigException:
+            return default
+
     def __init__(self):
-        self.kube_image = configuration.get('core', 'k8s_image')
-        self.git_repo = configuration.get('core', 'k8s_git_repo')
-        self.git_branch = configuration.get('core', 'k8s_git_branch')
+        self.dags_folder = configuration.get(self.core_section, 'dags_folder')
+        self.parallelism = configuration.getint(self.core_section, 'PARALLELISM')
+        self.kube_image = configuration.get(self.kubernetes_section, 'container_image')
+        self.delete_worker_pods = self.safe_getboolean(self.kubernetes_section, 'delete_worker_pods', True)
+        self.kube_namespace = os.environ.get('AIRFLOW_KUBE_NAMESPACE', 'default')
+
+        # These two props must be set together
+        self.git_repo = self.safe_get(self.kubernetes_section, 'git_repo', None)
+        self.git_branch = self.safe_get(self.kubernetes_section, 'git_branch', None)
+
+        # Or this one prop
+        self.dags_volume_claim = self.safe_get(self.kubernetes_section, 'dags_volume_claim', None)
+        # And optionally this prop
+        self.dags_volume_subpath = self.safe_get(self.kubernetes_section, 'dags_volume_subpath', None)
+
+        self._validate()
+
+    def _validate(self):
+        if self.dags_volume_claim:
+            # do volume things
+            pass
+        elif self.git_repo and self.git_branch:
+            # do git things
+            pass
+        else:
+            raise AirflowConfigException(
+                "In kubernetes mode you must set the following configs in the `kubernetes` section: "
+                "`dags_volume_claim` or "
+                "`git_repo and git_branch`"
+            )
+
+
+class PodMaker:
+    def __init__(self, kube_config):
+        self.logger = logging.getLogger(__name__)
+        self.kube_config = kube_config
+
+    def _get_volumes_and_mounts(self):
+        volume_name = "airflow-dags"
+
+        if self.kube_config.dags_volume_claim:
+            volumes = [{
+                "name": volume_name, "persistentVolumeClaim": {"claimName": self.kube_config.dags_volume_claim}
+            }]
+            volume_mounts = [{
+                "name": volume_name, "mountPath": self.kube_config.dags_folder,
+                "readOnly": True
+            }]
+            if self.kube_config.dags_volume_subpath:
+                volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath
+
+            return volumes, volume_mounts
+        else:
+            return [], []
+
+    def _get_args(self, airflow_command):
+        if self.kube_config.dags_volume_claim:
+            self.logger.info("Using k8s_dags_volume_claim for airflow dags")
+            return [airflow_command]
+        else:
+            self.logger.info("Using git-syncher for airflow dags")
+            cmd_args = "mkdir -p {dags_folder} && cd {dags_folder} &&" \
+                       "git init && git remote add origin {git_repo} && git pull origin {git_branch} --depth=1 &&" \
+                       "{command}".format(dags_folder=self.kube_config.dags_folder, git_repo=self.kube_config.git_repo,
+                                          git_branch=self.kube_config.git_branch, command=airflow_command)
+            return [cmd_args]
+
+    def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, airflow_command):
+        volumes, volume_mounts = self._get_volumes_and_mounts()
+
+        pod = Pod(
+            namespace=namespace,
+            name=pod_id,
+            image=self.kube_config.kube_image,
+            cmds=["bash", "-cx", "--"],
+            args=self._get_args(airflow_command),
+            labels={
+                "airflow-slave": "",
+                "dag_id": dag_id,
+                "task_id": task_id,
+                "execution_date": execution_date
+            },
+            envs={"AIRFLOW__CORE__EXECUTOR": "LocalExecutor"},
+            volumes=volumes,
+            volume_mounts=volume_mounts
+        )
+        return pod
 
 
 class KubernetesJobWatcher(multiprocessing.Process, object):
-    def __init__(self, watch_function, namespace, watcher_queue):
+    def __init__(self, namespace, watcher_queue, resource_version):
         self.logger = logging.getLogger(__name__)
         multiprocessing.Process.__init__(self)
-        self._watch_function = watch_function
-        self._watch = watch.Watch()
         self.namespace = namespace
         self.watcher_queue = watcher_queue
+        self.resource_version = resource_version
 
     def run(self):
-        self.logger.info("Event: and now my watch begins")
-        for event in self._watch.stream(self._watch_function, self.namespace,
-                                        label_selector='airflow-slave'):
+        kube_client = get_kube_client()
+        while True:
+            try:
+                self.resource_version = self._run(kube_client, self.resource_version)
+            except Exception:
+                self.logger.exception("Unknown error in KubernetesJobWatcher. Failing")
+                raise
+            else:
+                self.logger.warn("Watch died gracefully, starting back up with: "
+                                 "last resource_version: {}".format(self.resource_version))
+
+    def _run(self, kube_client, resource_version):
+        self.logger.info("Event: and now my watch begins starting at resource_version: {}".format(resource_version))
+        watcher = watch.Watch()
+
+        kwargs = {"label_selector": "airflow-slave"}
+        if resource_version:
+            kwargs["resource_version"] = resource_version
+
+        last_resource_version = None
+        for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, **kwargs):
             task = event['object']
-            self.logger.info("Event: {} had an event of type {}".format(task.metadata.name,
-                                                                        event['type']))
-            self.process_status(task.metadata.name, task.status.phase, task.metadata.labels)
+            self.logger.info("Event: {} had an event of type {}".format(task.metadata.name, event['type']))
+            self.process_status(
+                task.metadata.name, task.status.phase, task.metadata.labels, task.metadata.resource_version
+            )
+            last_resource_version = task.metadata.resource_version
 
-    def process_status(self, job_id, status, labels):
+        return last_resource_version
+
+    def process_status(self, pod_id, status, labels, resource_version):
         if status == 'Pending':
-            self.logger.info("Event: {} Pending".format(job_id))
+            self.logger.info("Event: {} Pending".format(pod_id))
         elif status == 'Failed':
-            self.logger.info("Event: {} Failed".format(job_id))
-            self.watcher_queue.put((job_id, State.FAILED, labels))
+            self.logger.info("Event: {} Failed".format(pod_id))
+            self.watcher_queue.put((pod_id, State.FAILED, labels, resource_version))
         elif status == 'Succeeded':
-            self.logger.info("Event: {} Succeeded".format(job_id))
-            self.watcher_queue.put((job_id, None, labels))
+            self.logger.info("Event: {} Succeeded".format(pod_id))
+            self.watcher_queue.put((pod_id, None, labels, resource_version))
         elif status == 'Running':
-            # self.logger.info("Event: {} is Running".format(job_id))
-            self.watcher_queue.put((job_id, State.RUNNING))
+            self.logger.info("Event: {} is Running".format(pod_id))
         else:
-            self.logger.info("Event: Invalid state: {} on job: {} with labels: {}".format(status, job_id, labels))
+            self.logger.warn("Event: Invalid state: {} on pod: {} with labels: {} "
+                             "with resource_version: {}".format(status, pod_id, labels, resource_version))
 
 
 class AirflowKubernetesScheduler(object):
-    def __init__(self, task_queue, result_queue):
+    def __init__(self, kube_config, task_queue, result_queue, session, kube_client):
         self.logger = logging.getLogger(__name__)
         self.logger.info("creating kubernetes executor")
         self.kube_config = KubeConfig()
@@ -79,11 +203,25 @@ class AirflowKubernetesScheduler(object):
         self.pending_jobs = set()
         self.namespace = os.environ['k8s_POD_NAMESPACE']
         self.logger.info("k8s: using namespace {}".format(self.namespace))
-        self.result_queue = result_queue
+        self.kube_client = kube_client
+        self.launcher = PodLauncher(kube_client=self.kube_client)
+        self.pod_maker = PodMaker(kube_config=self.kube_config)
         self.watcher_queue = multiprocessing.Queue()
-        self.helper = KubernetesHelper()
-        w = KubernetesJobWatcher(self.helper.pod_api.list_namespaced_pod, self.namespace, self.watcher_queue)
-        w.start()
+        self._session = session
+        self.kube_watcher = self._make_kube_watcher()
+
+    def _make_kube_watcher(self):
+        resource_version = KubeResourceVersion.get_current_resource_version(self._session)
+        watcher = KubernetesJobWatcher(self.namespace, self.watcher_queue, resource_version)
+        watcher.start()
+        return watcher
+
+    def _health_check_kube_watcher(self):
+        if self.kube_watcher.is_alive():
+            pass
+        else:
+            self.logger.error("Error while health checking kube watcher process. Process died for unknown reasons")
+            self.kube_watcher = self._make_kube_watcher()
 
     def run_next(self, next_job):
         """
@@ -99,36 +237,24 @@ class AirflowKubernetesScheduler(object):
         self.logger.info('k8s: job is {}'.format(str(next_job)))
         key, command = next_job
         dag_id, task_id, execution_date = key
-        self.logger.info("running for command {}".format(command))
-        cmd_args = "mkdir -p $AIRFLOW_HOME/dags/synched/git && cd $AIRFLOW_HOME/dags/synched/git &&" \
-                   "git init && git remote add origin {git_repo} && git pull origin {git_branch} --depth=1 &&" \
-                   "{command}".format(git_repo=self.kube_config.git_repo, git_branch=self.kube_config.git_branch,
-                                          command=command)
-        pod_id = self._create_job_id_from_key(key=key)
-        pod = KubernetesPodBuilder(
-            image=self.kube_config.kube_image,
-            cmds=["bash", "-cx", "--"],
-            args=[cmd_args],
-            kub_req_factory=SimplePodRequestFactory(),
-            namespace=self.namespace
+        self.logger.info("k8s: running for command {}".format(command))
+        self.logger.info("k8s: launching image {}".format(self.kube_config.kube_image))
+        pod = self.pod_maker.make_pod(
+            namespace=self.namespace, pod_id=self._create_pod_id(dag_id, task_id),
+            dag_id=dag_id, task_id=task_id, execution_date=self._datetime_to_label_safe_datestring(execution_date),
+            airflow_command=command
         )
-        pod.set_image_pull_policy("IfNotPresent")
-        pod.add_env_variables({"AIRFLOW__CORE__EXECUTOR": "LocalExecutor"})
-        pod.add_name(pod_id)
-        pod.add_labels({
-            "dag_id": dag_id,
-            "task_id": task_id,
-            "execution_date": self._datetime_to_label_safe_datestring(execution_date)
-        })
-        pod.launch()
-
         # the watcher will monitor pods, so we do not block.
         self.launcher.run_pod_async(pod)
         self.logger.info("k8s: Job created!")
 
-    def delete_job(self, key):
-        job_id = self._create_job_id_from_key(key)
-        self.helper.delete_pod(job_id, namespace=self.namespace)
+    def delete_pod(self, pod_id):
+        if self.kube_config.delete_worker_pods:
+            try:
+                self.kube_client.delete_namespaced_pod(pod_id, self.namespace, body=client.V1DeleteOptions())
+            except ApiException as e:
+                if e.status != 404:
+                    raise
 
     def sync(self):
         """
@@ -140,24 +266,53 @@ class AirflowKubernetesScheduler(object):
         :return:
 
         """
+        self._health_check_kube_watcher()
         while not self.watcher_queue.empty():
             self.process_watcher_task()
 
-    def end_task(self):
-        job_id, state, labels = self.watcher_queue.get()
-        logging.info("Attempting to finish job; job_id: {}; state: {}; labels: {}".format(job_id, state, labels))
+    def process_watcher_task(self):
+        pod_id, state, labels, resource_version = self.watcher_queue.get()
+        logging.info("Attempting to finish pod; pod_id: {}; state: {}; labels: {}".format(pod_id, state, labels))
         key = self._labels_to_key(labels)
         if key:
             self.logger.info("finishing job {}".format(key))
-            self.result_queue.put((key, state))
+            self.result_queue.put((key, state, pod_id, resource_version))
+
+    @staticmethod
+    def _strip_unsafe_kubernetes_special_chars(string):
+        """
+        Kubernetes only supports lowercase alphanumeric characters and "-" and "." in the pod name
+        However, there are special rules about how "-" and "." can be used so let's only keep alphanumeric chars
+        see here for detail: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
+        :param string:
+        :return:
+        """
+        return ''.join(ch.lower() for ind, ch in enumerate(string) if ch.isalnum())
+
+    @staticmethod
+    def _make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid):
+        """
+        Kubernetes pod names must be <= 253 chars and must pass the following regex for validation
+        "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
+        :param safe_dag_id: a dag_id with only alphanumeric characters
+        :param safe_task_id: a task_id with only alphanumeric characters
+        :param random_uuid: a uuid
+        :return:
+        """
+        MAX_POD_ID_LEN = 253
+
+        safe_key = safe_dag_id + safe_task_id
+
+        safe_pod_id = safe_key[:MAX_POD_ID_LEN-len(safe_uuid)-1] + "-" + safe_uuid
+
+        return safe_pod_id
 
     @staticmethod
-    def _create_job_id_from_key(key):
-        keystr = '-'.join([str(x).replace(' ', '-') for x in key[:2]])
-        job_fields = [keystr]
-        unformatted_job_id = '-'.join(job_fields)
-        job_id = unformatted_job_id.replace('_', '-')
-        return job_id
+    def _create_pod_id(dag_id, task_id):
+        safe_dag_id = AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(dag_id)
+        safe_task_id = AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(task_id)
+        safe_uuid = AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(uuid4().hex)
+        return AirflowKubernetesScheduler._make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid)
 
     @staticmethod
     def _label_safe_datestring_to_datetime(string):
@@ -166,7 +321,7 @@ class AirflowKubernetesScheduler(object):
         :param string: string
         :return: datetime.datetime object
         """
-        return datetime.strptime(string.replace("_", ":"), "%Y-%m-%dT%H:%M:%S")
+        return parser.parse(string.replace("_", ":"))
 
     @staticmethod
     def _datetime_to_label_safe_datestring(datetime_obj):
@@ -193,40 +348,74 @@ class KubernetesExecutor(BaseExecutor):
         self.task_queue = None
         self._session = None
         self.result_queue = None
-        self.pending_tasks = None
-        self.kub_client = None
+        self.kube_scheduler = None
+        self.kube_client = None
+        super(KubernetesExecutor, self).__init__(parallelism=self.kube_config.parallelism)
+
+    def clear_not_launched_queued_tasks(self):
+        """
+        If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or may not have been launched
+        Thus, on starting up the scheduler let's check every "Queued" task to see if it has been launched
+            (ie: if there is a corresponding pod on kubernetes)
+        If it has been launched then do nothing, otherwise reset the state to "None" so the task will be rescheduled
+        This will not be necessary in a future version of airflow in which there is proper support for State.LAUNCHED
+        :return: None
+        """
+        queued_tasks = self._session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED).all()
+        self.logger.info("When executor started up, found {} queued task instances".format(len(queued_tasks)))
+
+        for t in queued_tasks:
+            kwargs = dict(label_selector="dag_id={},task_id={},execution_date={}".format(
+                t.dag_id, t.task_id, AirflowKubernetesScheduler._datetime_to_label_safe_datestring(t.execution_date)
+            ))
+            pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs)
+            if len(pod_list.items) == 0:
+                self.logger.info("TaskInstance: {} found in queued state but was not launched, rescheduling".format(t))
+                self._session.query(TaskInstance).filter(
+                    TaskInstance.dag_id == t.dag_id,
+                    TaskInstance.task_id == t.task_id,
+                    TaskInstance.execution_date == t.execution_date
+                ).update({TaskInstance.state: State.NONE})
+
+        self._session.commit()
 
     def start(self):
         self.logger.info('k8s: starting kubernetes executor')
         self._session = settings.Session()
         self.task_queue = Queue()
         self.result_queue = Queue()
-        self.kub_client = AirflowKubernetesScheduler(self.task_queue, self.result_queue)
+        self.kube_client = get_kube_client()
+        self.kube_scheduler = AirflowKubernetesScheduler(
+            self.kube_config, self.task_queue, self.result_queue, self._session, self.kube_client
+        )
+        self.clear_not_launched_queued_tasks()
+
+    def execute_async(self, key, command, queue=None):
+        self.logger.info("k8s: adding task {} with command {}".format(key, command))
+        self.task_queue.put((key, command))
 
     def sync(self):
-        self.kub_client.sync()
+        self.logger.info("self.running: {}".format(self.running))
+        self.logger.info("self.queued: {}".format(self.queued_tasks))
+        self.kube_scheduler.sync()
+
+        last_resource_version = None
         while not self.result_queue.empty():
             results = self.result_queue.get()
-            self.logger.info("reporting {}".format(results))
-            self.change_state(*results)
+            key, state, pod_id, resource_version = results
+            last_resource_version = resource_version
+            self.logger.info("Changing state of {}".format(results))
+            self._change_state(key, state, pod_id)
 
-        if not self.task_queue.empty():
-            (key, command) = self.task_queue.get()
-            self.kub_client.run_next((key, command))
+        KubeResourceVersion.checkpoint_resource_version(last_resource_version, session=self._session)
 
-    def job_queue_full(self):
-        return len(self.kub_client.current_jobs) > PARALLELISM
-
-    def cluster_at_capacity(self):
-        return len(self.pending_tasks) > 5
-
-    def terminate(self):
-        pass
+        if not self.task_queue.empty():
+            key, command = self.task_queue.get()
+            self.kube_scheduler.run_next((key, command))
 
-    def change_state(self, key, state):
-        self.logger.info("k8s: setting state of {} to {}".format(key, state))
+    def _change_state(self, key, state, pod_id):
         if state != State.RUNNING:
-            self.kub_client.delete_job(key)
+            self.kube_scheduler.delete_pod(pod_id)
             try:
                 self.running.pop(key)
             except KeyError:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index d1a63a2..b01e14d 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -1,31 +1,25 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
+# -*- coding: utf-8 -*-
 #
-#   http://www.apache.org/licenses/LICENSE-2.0
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
 #
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 
 
-def _load_kube_config(in_cluster):
+def get_kube_client(in_cluster=True):
+    # TODO: This should also allow people to point to a cluster.
+
     from kubernetes import config, client
+
     if in_cluster:
         config.load_incluster_config()
         return client.CoreV1Api()
     else:
-        config.load_kube_config()
-        return client.CoreV1Api()
-
-
-def get_kube_client(in_cluster=True):
-    # TODO: This should also allow people to point to a cluster.
-    return _load_kube_config(in_cluster)
+        NotImplementedError("Running kubernetes jobs from not within the cluster is not supported at this time")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
index d2344a2..9921696 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
@@ -10,6 +10,3 @@
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
-
-from .kubernetes_request_factory import *
-from .pod_request_factory import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index d013016..89631e0 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -11,10 +11,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 
-import kubernetes_request_factory as kreq
 import yaml
+import airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory as kreq
 from airflow.contrib.kubernetes.pod import Pod
-from airflow import AirflowException
 
 
 class SimplePodRequestFactory(kreq.KubernetesRequestFactory):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py
index e92ae5c..e435a12 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -12,18 +12,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from airflow.contrib.kubernetes.pod import Pod
-from airflow.contrib.kubernetes.kubernetes_request_factory import SimplePodRequestFactory
-from kubernetes import config, client, watch
+from airflow.contrib.kubernetes.kubernetes_request_factory.pod_request_factory import SimplePodRequestFactory
+from kubernetes import watch
 from kubernetes.client import V1Pod
 from airflow.utils.state import State
 import json
 import logging
 
+from .kube_client import get_kube_client
+
 
 class PodLauncher:
-    def __init__(self):
+    def __init__(self, kube_client=None):
         self.kube_req_factory = SimplePodRequestFactory()
-        self._client = self._kube_client()
+        self._client = kube_client or get_kube_client()
         self._watch = watch.Watch()
         self.logger = logging.getLogger(__name__)
 
@@ -42,11 +44,6 @@ class PodLauncher:
         final_status = self._monitor_pod(pod)
         return final_status
 
-    def _kube_client(self):
-        #TODO: This should also allow people to point to a cluster.
-        config.load_incluster_config()
-        return client.CoreV1Api()
-
     def _monitor_pod(self, pod):
         # type: (Pod) -> State
         for event in self._watch.stream(self.read_pod(pod), pod.namespace):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index d5e958f..4515dac 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -39,7 +39,6 @@ class BaseExecutor(LoggingMixin):
         self.queued_tasks = {}
         self.running = {}
         self.event_buffer = {}
-        self.logger.setLevel(10)
 
     def start(self):  # pragma: no cover
         """
@@ -106,7 +105,7 @@ class BaseExecutor(LoggingMixin):
         """
         pass
 
-    def heartbeat(self, km=False):
+    def heartbeat(self):
         # Triggering new jobs
         if not self.parallelism:
             open_slots = len(self.queued_tasks)
@@ -132,7 +131,7 @@ class BaseExecutor(LoggingMixin):
             # does NOT eliminate it.
             self.queued_tasks.pop(key)
             ti.refresh_from_db()
-            if ti.state != State.RUNNING or km:
+            if ti.state != State.RUNNING:
                 self.running[key] = command
                 self.execute_async(key, command=command, queue=queue)
             else:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
new file mode 100644
index 0000000..d642476
--- /dev/null
+++ b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
@@ -0,0 +1,50 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""kubernetes_resource_checkpointing
+
+Revision ID: 33ae817a1ff4
+Revises: 947454bf1dff
+Create Date: 2017-09-11 15:26:47.598494
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '33ae817a1ff4'
+down_revision = 'd2ae31099d61'
+branch_labels = None
+depends_on = None
+
+
+from alembic import op
+import sqlalchemy as sa
+
+
+RESOURCE_TABLE = "kube_resource_version"
+
+
+def upgrade():
+    table = op.create_table(
+        RESOURCE_TABLE,
+        sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), primary_key=True),
+        sa.Column("resource_version", sa.String(255)),
+        sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id")
+    )
+    op.bulk_insert(table, [
+        {"resource_version": ""}
+    ])
+
+
+def downgrade():
+    op.drop_table(RESOURCE_TABLE)
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 02409dd..d03c363 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -56,7 +56,7 @@ from urllib.parse import urlparse, quote
 from sqlalchemy import (
     Column, Integer, String, DateTime, Text, Boolean, ForeignKey, PickleType,
     Index, Float, LargeBinary)
-from sqlalchemy import func, or_, and_
+from sqlalchemy import func, or_, and_, true as sqltrue
 from sqlalchemy.ext.declarative import declarative_base, declared_attr
 from sqlalchemy.dialects.mysql import LONGTEXT
 from sqlalchemy.orm import reconstructor, relationship, synonym
@@ -1118,28 +1118,6 @@ class TaskInstance(Base, LoggingMixin):
         session.commit()
 
     @provide_session
-    def update_hostname(self, hostname, session=None):
-        """
-        For use in kubernetes mode. Update the session to allow heartbeating to SQL
-        :param session:
-   
-        :return:
-        
-        """
-        t_i = TaskInstance
-
-        qry = session.query(t_i).filter(
-            t_i.dag_id == self.dag_id,
-            t_i.task_id == self.task_id,
-            t_i.execution_date == self.execution_date)
-
-        ti = qry.first()
-        if ti:
-            ti.hostname = hostname
-            session.add(ti)
-            session.commit()
-
-    @provide_session
     def refresh_from_db(self, session=None, lock_for_update=False):
         """
         Refreshes the task instance from the database based on the primary key
@@ -5121,3 +5099,24 @@ class ImportError(Base):
     timestamp = Column(UtcDateTime)
     filename = Column(String(1024))
     stacktrace = Column(Text)
+
+
+class KubeResourceVersion(Base):
+    __tablename__ = "kube_resource_version"
+    one_row_id = Column(Boolean, server_default=sqltrue(), primary_key=True)
+    resource_version = Column(String(255))
+
+    @staticmethod
+    @provide_session
+    def get_current_resource_version(session=None):
+        (resource_version,) = session.query(KubeResourceVersion.resource_version).one()
+        return resource_version
+
+    @staticmethod
+    @provide_session
+    def checkpoint_resource_version(resource_version, session=None):
+        if resource_version:
+            session.query(KubeResourceVersion).update({
+                KubeResourceVersion.resource_version: resource_version
+            })
+            session.commit()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/plugins_manager.py
----------------------------------------------------------------------
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index a7adda6..735f2de 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -48,7 +48,6 @@ class AirflowPlugin(object):
     admin_views = []
     flask_blueprints = []
     menu_links = []
-    dag_importer = None
 
     @classmethod
     def validate(cls):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/docker/Dockerfile b/docker/Dockerfile
deleted file mode 100644
index 38e7c7c..0000000
--- a/docker/Dockerfile
+++ /dev/null
@@ -1,40 +0,0 @@
-FROM ubuntu:16.04
-
-# install deps
-RUN apt-get update -y && apt-get install -y \
-        wget \
-        python-dev \
-        python-pip \
-        libczmq-dev \
-        libcurlpp-dev \
-        curl \
-        libssl-dev \
-        git \
-        inetutils-telnet \
-        bind9utils
-
-RUN pip install -U setuptools && \
-    pip install -U pip
-
-RUN pip install kubernetes && \
-    pip install cryptography && \
-    pip install psycopg2==2.7.1
-
-# install airflow
-COPY airflow.tar.gz /tmp/airflow.tar.gz
-RUN pip install /tmp/airflow.tar.gz
-
-# prep airflow
-ENV AIRFLOW_HOME=/root/airflow
-ENV AIRFLOW__CORE__EXECUTOR=KubernetesExecutor
-ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
-
-# Don't read the raw revisions git-sync puts in the volume, we always want to read from the symlink
-# A hack, how else can we do this
-RUN mkdir -p $AIRFLOW_HOME/dags && echo "rev-[a-zA-Z0-9]+" > $AIRFLOW_HOME/dags/.airflowignore
-
-
-COPY bootstrap.sh /bootstrap.sh
-RUN chmod +x /bootstrap.sh
-
-ENTRYPOINT ["/bootstrap.sh"]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/docker/bootstrap.sh
----------------------------------------------------------------------
diff --git a/docker/bootstrap.sh b/docker/bootstrap.sh
deleted file mode 100644
index 82124ac..0000000
--- a/docker/bootstrap.sh
+++ /dev/null
@@ -1,13 +0,0 @@
-#!/bin/bash
-
-# launch the appropriate process
-
-if [ "$1" = "webserver" ]
-then
-	exec airflow webserver
-fi
-
-if [ "$1" = "scheduler" ]
-then
-	exec airflow scheduler
-fi

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/docker/build.sh
----------------------------------------------------------------------
diff --git a/docker/build.sh b/docker/build.sh
deleted file mode 100755
index f2a942e..0000000
--- a/docker/build.sh
+++ /dev/null
@@ -1,12 +0,0 @@
-IMAGE=grantnicholas/kubeairflow
-TAG=${1:-latest}
-
-if [ -f airflow.tar.gz ]; then
-    echo "Not rebuilding airflow source"
-else
-    cd ../ && python setup.py sdist && cd docker && \
-    cp ../dist/apache-airflow-1.9.0.dev0+incubating.tar.gz airflow.tar.gz
-fi
-
-docker build . --tag=${IMAGE}:${TAG}
-docker push ${IMAGE}:${TAG}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/kube/airflow.yaml.template
----------------------------------------------------------------------
diff --git a/kube/airflow.yaml.template b/kube/airflow.yaml.template
deleted file mode 100644
index eca6d3c..0000000
--- a/kube/airflow.yaml.template
+++ /dev/null
@@ -1,103 +0,0 @@
-apiVersion: extensions/v1beta1
-kind: Deployment
-metadata:
-  name: airflow
-spec:
-  replicas: 1
-  template:
-    metadata:
-      labels:
-        name: airflow
-      annotations:
-        pod.beta.kubernetes.io/init-containers: '[
-          {
-              "name": "init",
-              "image": "{{docker_image}}",
-              "command": ["bash", "-c", "cd /usr/local/lib/python2.7/dist-packages/airflow && airflow initdb && alembic upgrade head"]
-          }
-      ]'
-    spec:
-      initContainers:
-      - name: init
-        image: {{docker_image}}
-        command: [
-          "bash", "-c", "cd /usr/local/lib/python2.7/dist-packages/airflow && airflow initdb && alembic upgrade head"
-        ]
-      containers:
-      - name: web
-        image: {{docker_image}}
-        ports:
-        - name: web
-          containerPort: 8080
-        args: ["webserver"]
-        env:
-        - name: AIRFLOW__CORE__EXECUTOR
-          value: KubernetesExecutor
-        - name: AIRFLOW__CORE__K8S_IMAGE
-          value: {{docker_image}}
-        - name: AIRFLOW__CORE__K8S_GIT_REPO
-          value: https://github.com/grantnicholas/testdags.git
-        - name: AIRFLOW__CORE__K8S_GIT_BRANCH
-          value: master
-        volumeMounts:
-        - name: dags
-          mountPath: /root/airflow/dags/synched
-        readinessProbe:
-          initialDelaySeconds: 5
-          timeoutSeconds: 5
-          periodSeconds: 5
-          httpGet:
-            path: /admin
-            port: 8080
-        livenessProbe:
-          initialDelaySeconds: 5
-          timeoutSeconds: 5
-          failureThreshold: 5
-          httpGet:
-            path: /admin
-            port: 8080
-      - name: scheduler
-        image: {{docker_image}}
-        args: ["scheduler"]
-        env:
-        - name: AIRFLOW__CORE__EXECUTOR
-          value: KubernetesExecutor
-        - name: AIRFLOW__CORE__K8S_IMAGE
-          value: {{docker_image}}
-        - name: AIRFLOW__CORE__K8S_GIT_REPO
-          value: https://github.com/grantnicholas/testdags.git
-        - name: AIRFLOW__CORE__K8S_GIT_BRANCH
-          value: master
-        - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
-          value: "60"
-        volumeMounts:
-        - name: dags
-          mountPath: /root/airflow/dags/synched
-      - name: sync
-        image: gcr.io/google_containers/git-sync:v2.0.4
-        env:
-        - name: GIT_SYNC_REPO
-          value: https://github.com/grantnicholas/testdags.git
-        - name: GIT_SYNC_BRANCH
-          value: master
-        - name: GIT_SYNC_DEST
-          value: git
-        volumeMounts:
-        - name: dags
-          mountPath: /git
-      volumes:
-      - name: dags
-        emptyDir: {}
----
-apiVersion: v1
-kind: Service
-metadata:
-  name: airflow
-spec:
-  type: NodePort
-  ports:
-    - port: 8080
-      nodePort: 30809
-  selector:
-    name: airflow
-

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/kube/deploy.sh
----------------------------------------------------------------------
diff --git a/kube/deploy.sh b/kube/deploy.sh
deleted file mode 100755
index 28b58ca..0000000
--- a/kube/deploy.sh
+++ /dev/null
@@ -1,6 +0,0 @@
-IMAGE=${1:-grantnicholas/kubeairflow}
-TAG=${2:-latest}
-
-mkdir -p .generated
-kubectl apply -f postgres.yaml
-sed "s#{{docker_image}}#$IMAGE:$TAG#g" airflow.yaml.template > .generated/airflow.yaml && kubectl apply -f .generated/airflow.yaml

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/kube/postgres.yaml
----------------------------------------------------------------------
diff --git a/kube/postgres.yaml b/kube/postgres.yaml
deleted file mode 100644
index a7298d1..0000000
--- a/kube/postgres.yaml
+++ /dev/null
@@ -1,94 +0,0 @@
-apiVersion: v1
-kind: PersistentVolume
-metadata:
-  name: postgres-airflow
-spec:
-  accessModes:
-    - ReadWriteOnce
-  capacity:
-    storage: 5Gi
-  hostPath:
-    path: /data/postgres-airflow
----
-kind: PersistentVolumeClaim
-apiVersion: v1
-metadata:
-  name: postgres-airflow
-spec:
-  accessModes:
-    - ReadWriteOnce
-  resources:
-    requests:
-      storage: 5Gi
----
-kind: Deployment
-apiVersion: extensions/v1beta1
-metadata:
-  name: postgres-airflow
-spec:
-  replicas: 1
-  template:
-    metadata:
-      labels:
-        name: postgres-airflow
-    spec:
-      restartPolicy: Always
-      containers:
-        - name: postgres
-          image: postgres
-          ports:
-            - containerPort: 5432
-              protocol: TCP
-          volumeMounts:
-            - name: dbvol
-              mountPath: /var/lib/postgresql/data/pgdata
-              subPath: pgdata
-          env:
-            - name: POSTGRES_USER
-              value: root
-            - name: POSTGRES_PASSWORD
-              value: root
-            - name: POSTGRES_DB
-              value: airflow
-            - name: PGDATA
-              value: /var/lib/postgresql/data/pgdata
-            - name: POD_IP
-              valueFrom: { fieldRef: { fieldPath: status.podIP } }
-          livenessProbe:
-            initialDelaySeconds: 60
-            timeoutSeconds: 5
-            failureThreshold: 5
-            exec:
-              command:
-              - /bin/sh
-              - -c
-              - exec pg_isready --host $POD_IP ||  if [[ $(psql -qtAc --host $POD_IP 'SELECT pg_is_in_recovery') != "f" ]]; then  exit 0 else; exit 1; fi
-          readinessProbe:
-            initialDelaySeconds: 5
-            timeoutSeconds: 5
-            periodSeconds: 5
-            exec:
-              command:
-              - /bin/sh
-              - -c
-              - exec pg_isready --host $POD_IP
-          resources:
-            requests:
-              memory: .5Gi
-              cpu: .5
-      volumes:
-        - name: dbvol
-          persistentVolumeClaim:
-            claimName: postgres-airflow
----
-apiVersion: v1
-kind: Service
-metadata:
-  name: postgres-airflow
-spec:
-  clusterIP: None
-  ports:
-    - port: 5432
-      targetPort: 5432
-  selector:
-    name: postgres-airflow

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/Dockerfile b/scripts/ci/kubernetes/docker/Dockerfile
new file mode 100644
index 0000000..a3b05b0
--- /dev/null
+++ b/scripts/ci/kubernetes/docker/Dockerfile
@@ -0,0 +1,53 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+
+FROM ubuntu:16.04
+
+# install deps
+RUN apt-get update -y && apt-get install -y \
+        wget \
+        python-dev \
+        python-pip \
+        libczmq-dev \
+        libcurlpp-dev \
+        curl \
+        libssl-dev \
+        git \
+        inetutils-telnet \
+        bind9utils
+
+RUN pip install -U setuptools && \
+    pip install -U pip
+
+RUN pip install kubernetes && \
+    pip install cryptography && \
+    pip install psycopg2==2.7.3.1  # I had issues with older versions of psycopg2, just a warning
+
+# install airflow
+COPY airflow.tar.gz /tmp/airflow.tar.gz
+RUN pip install /tmp/airflow.tar.gz
+
+# prep airflow
+ENV AIRFLOW_HOME=/root/airflow
+ENV AIRFLOW__CORE__EXECUTOR=KubernetesExecutor
+ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
+
+
+COPY bootstrap.sh /bootstrap.sh
+RUN chmod +x /bootstrap.sh
+
+ENTRYPOINT ["/bootstrap.sh"]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/docker/bootstrap.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/bootstrap.sh b/scripts/ci/kubernetes/docker/bootstrap.sh
new file mode 100644
index 0000000..b8e54e6
--- /dev/null
+++ b/scripts/ci/kubernetes/docker/bootstrap.sh
@@ -0,0 +1,29 @@
+#!/bin/bash
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+
+# launch the appropriate process
+
+if [ "$1" = "webserver" ]
+then
+	exec airflow webserver
+fi
+
+if [ "$1" = "scheduler" ]
+then
+	exec airflow scheduler
+fi

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/docker/build.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/build.sh b/scripts/ci/kubernetes/docker/build.sh
new file mode 100755
index 0000000..d36ea86
--- /dev/null
+++ b/scripts/ci/kubernetes/docker/build.sh
@@ -0,0 +1,29 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+
+IMAGE=${1:-airflow/ci}
+TAG=${2:-latest}
+DIRNAME=$(cd "$(dirname "$0")"; pwd)
+AIRFLOW_ROOT="$DIRNAME/../../../.."
+
+ENVCONFIG=$(minikube docker-env)
+if [ $? -eq 0 ]; then
+  eval $ENVCONFIG
+fi
+
+cd $AIRFLOW_ROOT && python setup.py sdist && cp $AIRFLOW_ROOT/dist/*.tar.gz $DIRNAME/airflow.tar.gz && \
+cd $DIRNAME && docker build $DIRNAME --tag=${IMAGE}:${TAG}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/kube/airflow.yaml.template
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/airflow.yaml.template b/scripts/ci/kubernetes/kube/airflow.yaml.template
new file mode 100644
index 0000000..a297b95
--- /dev/null
+++ b/scripts/ci/kubernetes/kube/airflow.yaml.template
@@ -0,0 +1,165 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+
+# The backing volume can be anything you want, it just needs to be `ReadWriteOnce`
+# I'm using hostPath since minikube is nice for testing, but any (non-local) volume will work on a real cluster
+kind: PersistentVolume
+apiVersion: v1
+metadata:
+  name: airflow-dags
+  labels:
+    type: local
+spec:
+  capacity:
+    storage: 10Gi
+  accessModes:
+    - ReadWriteOnce
+  hostPath:
+    path: "/data/airflow-dags"
+---
+kind: PersistentVolumeClaim
+apiVersion: v1
+metadata:
+  name: airflow-dags
+spec:
+  accessModes:
+    - ReadWriteOnce
+  resources:
+    requests:
+      storage: 10Gi
+---
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+  name: airflow
+spec:
+  replicas: 1
+  template:
+    metadata:
+      labels:
+        name: airflow
+      annotations:
+        pod.beta.kubernetes.io/init-containers: '[
+          {
+              "name": "init",
+              "image": "{{docker_image}}",
+              "imagePullPolicy": "IfNotPresent",
+              "command": [
+                "bash", "-cx", "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* $AIRFLOW_HOME/dags/ && airflow initdb && alembic upgrade head"
+              ],
+              "env": [
+                {"name": "AIRFLOW__KUBERNETES__CONTAINER_IMAGE", "value": ""},
+                {"name": "AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM", "value": "airflow-dags"},
+                {"name": "AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH", "value": "git"}
+              ],
+              "volumeMounts": [
+                {"name": "airflow-dags", "mountPath": "/root/airflow/dags"}
+              ]
+          }
+      ]'
+    spec:
+      containers:
+      - name: web
+        image: {{docker_image}}
+        imagePullPolicy: IfNotPresent
+        ports:
+        - name: web
+          containerPort: 8080
+        args: ["webserver"]
+        env:
+        - name: AIRFLOW_KUBE_NAMESPACE
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.namespace
+        - name: AIRFLOW__CORE__EXECUTOR
+          value: KubernetesExecutor
+        - name: AIRFLOW__KUBERNETES__CONTAINER_IMAGE
+          value: {{docker_image}}
+        - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
+          value: "True"
+        # set these two confs
+        - name: AIRFLOW__KUBERNETES__GIT_REPO
+          value: https://github.com/grantnicholas/testdags.git
+        - name: AIRFLOW__KUBERNETES__GIT_BRANCH
+          value: master
+        # or this one
+        - name: AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM
+          value: airflow-dags
+        #
+        volumeMounts:
+        - name: airflow-dags
+          mountPath: /root/airflow/dags
+        readinessProbe:
+          initialDelaySeconds: 5
+          timeoutSeconds: 5
+          periodSeconds: 5
+          httpGet:
+            path: /admin
+            port: 8080
+        livenessProbe:
+          initialDelaySeconds: 5
+          timeoutSeconds: 5
+          failureThreshold: 5
+          httpGet:
+            path: /admin
+            port: 8080
+      - name: scheduler
+        image: {{docker_image}}
+        imagePullPolicy: IfNotPresent
+        args: ["scheduler"]
+        env:
+        - name: AIRFLOW_KUBE_NAMESPACE
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.namespace
+        - name: AIRFLOW__CORE__EXECUTOR
+          value: KubernetesExecutor
+        - name: AIRFLOW__KUBERNETES__CONTAINER_IMAGE
+          value: {{docker_image}}
+        - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
+          value: "True"
+        # set these two confs
+        - name: AIRFLOW__KUBERNETES__GIT_REPO
+          value: https://github.com/grantnicholas/testdags.git
+        - name: AIRFLOW__KUBERNETES__GIT_BRANCH
+          value: master
+        # or set this one
+        - name: AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM
+          value: airflow-dags
+        #
+        - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
+          value: "60"
+        volumeMounts:
+        - name: airflow-dags
+          mountPath: /root/airflow/dags
+      volumes:
+      - name: airflow-dags
+        persistentVolumeClaim:
+          claimName: airflow-dags
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: airflow
+spec:
+  type: NodePort
+  ports:
+    - port: 8080
+      nodePort: 30809
+  selector:
+    name: airflow
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/kube/deploy.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/deploy.sh b/scripts/ci/kubernetes/kube/deploy.sh
new file mode 100755
index 0000000..2532d83
--- /dev/null
+++ b/scripts/ci/kubernetes/kube/deploy.sh
@@ -0,0 +1,42 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+
+IMAGE=${1:-airflow/ci}
+TAG=${2:-latest}
+DIRNAME=$(cd "$(dirname "$0")"; pwd)
+
+# create an emptydir for postgres to store it's volume data in
+sudo mkdir -p /data/postgres-airflow
+
+mkdir -p $DIRNAME/.generated
+kubectl apply -f $DIRNAME/postgres.yaml
+sed "s#{{docker_image}}#$IMAGE:$TAG#g" $DIRNAME/airflow.yaml.template > $DIRNAME/.generated/airflow.yaml && kubectl apply -f $DIRNAME/.generated/airflow.yaml
+
+
+# wait for up to 10 minutes for everything to be deployed
+for i in {1..150}
+do
+  echo "------- Running kubectl get pods -------"
+  PODS=$(kubectl get pods | awk 'NR>1 {print $0}')
+  echo "$PODS"
+  NUM_AIRFLOW_READY=$(echo $PODS | grep airflow | awk '{print $2}' | grep -E '([0-9])\/(\1)' | wc -l)
+  NUM_POSTGRES_READY=$(echo $PODS | grep postgres | awk '{print $2}' | grep -E '([0-9])\/(\1)' | wc -l)
+  if [ "$NUM_AIRFLOW_READY" == "1" ] && [ "$NUM_POSTGRES_READY" == "1" ]; then
+    break
+  fi
+  sleep 4
+done

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/kube/postgres.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/postgres.yaml b/scripts/ci/kubernetes/kube/postgres.yaml
new file mode 100644
index 0000000..79366d0
--- /dev/null
+++ b/scripts/ci/kubernetes/kube/postgres.yaml
@@ -0,0 +1,111 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+
+apiVersion: v1
+kind: PersistentVolume
+metadata:
+  name: postgres-airflow
+spec:
+  accessModes:
+    - ReadWriteOnce
+  capacity:
+    storage: 5Gi
+  hostPath:
+    path: /data/postgres-airflow
+---
+kind: PersistentVolumeClaim
+apiVersion: v1
+metadata:
+  name: postgres-airflow
+spec:
+  accessModes:
+    - ReadWriteOnce
+  resources:
+    requests:
+      storage: 5Gi
+---
+kind: Deployment
+apiVersion: extensions/v1beta1
+metadata:
+  name: postgres-airflow
+spec:
+  replicas: 1
+  template:
+    metadata:
+      labels:
+        name: postgres-airflow
+    spec:
+      restartPolicy: Always
+      containers:
+        - name: postgres
+          image: postgres
+          ports:
+            - containerPort: 5432
+              protocol: TCP
+          volumeMounts:
+            - name: dbvol
+              mountPath: /var/lib/postgresql/data/pgdata
+              subPath: pgdata
+          env:
+            - name: POSTGRES_USER
+              value: root
+            - name: POSTGRES_PASSWORD
+              value: root
+            - name: POSTGRES_DB
+              value: airflow
+            - name: PGDATA
+              value: /var/lib/postgresql/data/pgdata
+            - name: POD_IP
+              valueFrom: { fieldRef: { fieldPath: status.podIP } }
+          livenessProbe:
+            initialDelaySeconds: 60
+            timeoutSeconds: 5
+            failureThreshold: 5
+            exec:
+              command:
+              - /bin/sh
+              - -c
+              - exec pg_isready --host $POD_IP ||  if [[ $(psql -qtAc --host $POD_IP 'SELECT pg_is_in_recovery') != "f" ]]; then  exit 0 else; exit 1; fi
+          readinessProbe:
+            initialDelaySeconds: 5
+            timeoutSeconds: 5
+            periodSeconds: 5
+            exec:
+              command:
+              - /bin/sh
+              - -c
+              - exec pg_isready --host $POD_IP
+          resources:
+            requests:
+              memory: .5Gi
+              cpu: .5
+      volumes:
+        - name: dbvol
+          persistentVolumeClaim:
+            claimName: postgres-airflow
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: postgres-airflow
+spec:
+  clusterIP: None
+  ports:
+    - port: 5432
+      targetPort: 5432
+  selector:
+    name: postgres-airflow

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
index be370cf..8a27d75 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -1,19 +1,19 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
 
 # Guard against a kubernetes cluster already being up
 kubectl get pods &> /dev/null
@@ -23,8 +23,8 @@ if [ $? -eq 0 ]; then
 fi
 #
 
-curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.24.1/minikube-linux-amd64 && chmod +x minikube
-curl -Lo kubectl  https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl
+curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube
+curl -Lo kubectl  https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl && chmod +x kubectl
 
 sudo mkdir -p /usr/local/bin
 sudo mv minikube /usr/local/bin/minikube
@@ -38,43 +38,15 @@ mkdir $HOME/.kube || true
 touch $HOME/.kube/config
 
 export KUBECONFIG=$HOME/.kube/config
-
-start_minikube(){
-  sudo -E minikube start --vm-driver=none --kubernetes-version="${KUBERNETES_VERSION}"
-
-  # this for loop waits until kubectl can access the api server that minikube has created
-  for i in {1..90} # timeout 3 minutes
-  do
-    echo "------- Running kubectl get pods -------"
-    STDERR=$(kubectl get pods  2>&1 >/dev/null)
-    if [ $? -eq 0 ]; then
-      echo $STDERR
-
-      # We do not need dynamic hostpath provisioning, so disable the default storageclass
-      sudo -E minikube addons disable default-storageclass && kubectl delete storageclasses --all
-
-      # We need to give permission to watch pods to the airflow scheduler.
-      # The easiest way to do that is by giving admin access to the default serviceaccount (NOT SAFE!)
-      kubectl create clusterrolebinding add-on-cluster-admin   --clusterrole=cluster-admin   --serviceaccount=default:default
-      exit 0
-    fi
-    echo $STDERR
-    sleep 2
-  done
-}
-
-cleanup_minikube(){
-  sudo -E minikube stop
-  sudo -E minikube delete
-  docker stop $(docker ps -a -q) || true
-  docker rm $(docker ps -a -q) || true
-  sleep 1
-}
-
-start_minikube
-echo "Minikube cluster creation timedout. Attempting to restart the minikube cluster."
-cleanup_minikube
-start_minikube
-echo "Minikube cluster creation timedout a second time. Failing."
-
-exit 1
+sudo -E minikube start --vm-driver=none
+
+# this for loop waits until kubectl can access the api server that minikube has created
+for i in {1..150} # timeout for 5 minutes
+do
+  echo "------- Running kubectl get pods -------"
+  kubectl get po &> /dev/null
+  if [ $? -ne 1 ]; then
+    break
+  fi
+  sleep 2
+done

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/setup_kubernetes.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/setup_kubernetes.sh b/scripts/ci/kubernetes/setup_kubernetes.sh
index fa4e523..ea559a0 100755
--- a/scripts/ci/kubernetes/setup_kubernetes.sh
+++ b/scripts/ci/kubernetes/setup_kubernetes.sh
@@ -24,5 +24,7 @@ echo "For development, start minikube yourself (ie: minikube start) then run thi
 DIRNAME=$(cd "$(dirname "$0")"; pwd)
 
 $DIRNAME/minikube/start_minikube.sh
+$DIRNAME/docker/build.sh
+$DIRNAME/kube/deploy.sh
 
 echo "Airflow environment on kubernetes is good to go!"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/run_tests.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/run_tests.sh b/scripts/ci/run_tests.sh
index 9542305..d5e7655 100755
--- a/scripts/ci/run_tests.sh
+++ b/scripts/ci/run_tests.sh
@@ -43,11 +43,5 @@ if [ "${TRAVIS}" ]; then
     kinit -kt ${KRB5_KTNAME} airflow
 fi
 
-if [[ "$RUN_FLAKE8" == "true" ]]; then
-    ./flake8_diff.sh
-fi
-
-if [[ "$SKIP_TESTS" != "true" ]]; then
-    echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN
-    ./run_unit_tests.sh $@
-fi
+echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN
+./run_unit_tests.sh $@

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
index 8766e94..5b2a198 100755
--- a/scripts/ci/travis_script.sh
+++ b/scripts/ci/travis_script.sh
@@ -21,12 +21,12 @@ DIRNAME=$(cd "$(dirname "$0")"; pwd)
 AIRFLOW_ROOT="$DIRNAME/../.."
 cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version
 
-if [ -z "$KUBERNETES_VERSION" ];
+if [ -z "$RUN_KUBE_INTEGRATION" ];
 then
   tox -e $TOX_ENV
 else
-  KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \
-  tox -e $TOX_ENV -- tests.contrib.minikube_tests \
+  $DIRNAME/kubernetes/setup_kubernetes.sh && \
+  tox -e $TOX_ENV -- tests.contrib.executors.integration \
                      --with-coverage \
                      --cover-erase \
                      --cover-html \

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/tests/contrib/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/__init__.py b/tests/contrib/__init__.py
index 008677e..50b2e1d 100644
--- a/tests/contrib/__init__.py
+++ b/tests/contrib/__init__.py
@@ -20,4 +20,3 @@
 from __future__ import absolute_import
 from .operators import *
 from .sensors import *
-from .kubernetes import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/tests/contrib/executors/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/executors/__init__.py b/tests/contrib/executors/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/contrib/executors/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/tests/contrib/executors/integration/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/executors/integration/__init__.py b/tests/contrib/executors/integration/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/contrib/executors/integration/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/tests/contrib/executors/integration/airflow_controller.py
----------------------------------------------------------------------
diff --git a/tests/contrib/executors/integration/airflow_controller.py b/tests/contrib/executors/integration/airflow_controller.py
new file mode 100644
index 0000000..499adb4
--- /dev/null
+++ b/tests/contrib/executors/integration/airflow_controller.py
@@ -0,0 +1,114 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import subprocess
+import time
+
+
+class RunCommandError(Exception):
+    pass
+
+
+class TimeoutError(Exception):
+    pass
+
+
+class DagRunState:
+    SUCCESS = "success"
+    FAILED = "failed"
+    RUNNING = "running"
+
+
+def run_command(command):
+    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    stdout, stderr = process.communicate()
+    if process.returncode != 0:
+        raise RunCommandError("Error while running command: {}; Stdout: {}; Stderr: {}".format(
+            command, stdout, stderr
+        ))
+    return stdout, stderr
+
+
+def run_command_in_pod(pod_name, container_name, command):
+    return run_command("kubectl exec {pod_name} -c {container_name} -- {command}".format(
+        pod_name=pod_name, container_name=container_name, command=command
+    ))
+
+def _unpause_dag(dag_id, airflow_pod=None):
+    airflow_pod = airflow_pod or _get_airflow_pod()
+    return run_command_in_pod(airflow_pod, "scheduler", "airflow unpause {dag_id}".format(dag_id=dag_id))
+
+def run_dag(dag_id, run_id, airflow_pod=None):
+    airflow_pod = airflow_pod or _get_airflow_pod()
+    _unpause_dag(dag_id, airflow_pod)
+    return run_command_in_pod(airflow_pod, "scheduler", "airflow trigger_dag {dag_id} -r {run_id}".format(
+        dag_id=dag_id, run_id=run_id
+    ))
+
+
+def _get_pod_by_grep(grep_phrase):
+    stdout, stderr = run_command("kubectl get pods | grep {grep_phrase} | awk '{{print $1}}'".format(
+        grep_phrase=grep_phrase
+    ))
+    pod_name = stdout.strip()
+    return pod_name
+
+
+def _get_airflow_pod():
+    return _get_pod_by_grep("^airflow")
+
+
+def _get_postgres_pod():
+    return _get_pod_by_grep("^postgres")
+
+
+def _parse_state(stdout):
+    end_line = "(1 row)"
+    prev_line = None
+    for line in stdout.split("\n"):
+        if end_line in line:
+            return prev_line.strip()
+        prev_line = line
+
+    raise Exception("Unknown psql output: {}".format(stdout))
+
+def get_dag_run_state(dag_id, run_id, postgres_pod=None):
+    postgres_pod = postgres_pod or _get_postgres_pod()
+    stdout, stderr = run_command_in_pod(
+        postgres_pod, "postgres",
+        """psql airflow -c "select state from dag_run where dag_id='{dag_id}' and run_id='{run_id}'" """.format(
+            dag_id=dag_id, run_id=run_id
+        )
+    )
+    return _parse_state(stdout)
+
+
+def dag_final_state(dag_id, run_id, postgres_pod=None, poll_interval=1, timeout=120):
+    postgres_pod = postgres_pod or _get_postgres_pod()
+    for _ in range(0, timeout / poll_interval):
+        dag_state = get_dag_run_state(dag_id, run_id, postgres_pod)
+        if dag_state != DagRunState.RUNNING:
+            return dag_state
+        time.sleep(poll_interval)
+
+    raise TimeoutError("Timed out while waiting for DagRun with dag_id: {} run_id: {}".format(dag_id, run_id))
+
+
+def _kill_pod(pod_name):
+    return run_command("kubectl delete pod {pod_name}".format(pod_name=pod_name))
+
+
+def kill_scheduler():
+    airflow_pod = _get_pod_by_grep("^airflow")
+    return _kill_pod(airflow_pod)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/tests/contrib/executors/integration/test_kubernetes_executor_integration.py
----------------------------------------------------------------------
diff --git a/tests/contrib/executors/integration/test_kubernetes_executor_integration.py b/tests/contrib/executors/integration/test_kubernetes_executor_integration.py
new file mode 100644
index 0000000..709ae6a
--- /dev/null
+++ b/tests/contrib/executors/integration/test_kubernetes_executor_integration.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import time
+from uuid import uuid4
+from tests.contrib.executors.integration.airflow_controller import (
+    run_command, RunCommandError,
+    run_dag, get_dag_run_state, dag_final_state, DagRunState,
+    kill_scheduler
+)
+
+
+try:
+    run_command("kubectl get pods")
+except RunCommandError:
+    SKIP_KUBE = True
+else:
+    SKIP_KUBE = False
+
+
+class KubernetesExecutorTest(unittest.TestCase):
+
+    @unittest.skipIf(SKIP_KUBE, 'Kubernetes integration tests are unsupported by this configuration')
+    def test_kubernetes_executor_dag_runs_successfully(self):
+        dag_id, run_id = "example_python_operator", uuid4().hex
+        run_dag(dag_id, run_id)
+        state = dag_final_state(dag_id, run_id, timeout=120)
+        self.assertEquals(state, DagRunState.SUCCESS)
+
+    @unittest.skipIf(SKIP_KUBE, 'Kubernetes integration tests are unsupported by this configuration')
+    def test_start_dag_then_kill_scheduler_then_ensure_dag_succeeds(self):
+        dag_id, run_id = "example_python_operator", uuid4().hex
+        run_dag(dag_id, run_id)
+
+        self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING)
+
+        time.sleep(10)
+
+        kill_scheduler()
+
+        self.assertEquals(dag_final_state(dag_id, run_id, timeout=180), DagRunState.SUCCESS)
+
+
+if __name__ == "__main__":
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/tests/contrib/executors/test_kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py
new file mode 100644
index 0000000..a60489e
--- /dev/null
+++ b/tests/contrib/executors/test_kubernetes_executor.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import re
+import string
+import random
+from datetime import datetime
+
+try:
+    from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler
+except ImportError:
+    AirflowKubernetesScheduler = None
+
+
+class TestAirflowKubernetesScheduler(unittest.TestCase):
+
+    def _gen_random_string(self, str_len):
+        return ''.join([random.choice(string.printable) for _ in range(str_len)])
+
+    def _cases(self):
+        cases = [
+            ("my_dag_id", "my-task-id"),
+            ("my.dag.id", "my.task.id"),
+            ("MYDAGID", "MYTASKID"),
+            ("my_dag_id", "my_task_id"),
+            ("mydagid"*200, "my_task_id"*200)
+        ]
+
+        cases.extend([
+            (self._gen_random_string(200), self._gen_random_string(200))
+            for _ in range(100)
+        ])
+
+        return cases
+
+    def _is_valid_name(self, name):
+        regex = "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
+        return len(name) <= 253 and \
+               all(ch.lower() == ch for ch in name) and \
+               re.match(regex, name)
+
+    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    def test_create_pod_id(self):
+        for dag_id, task_id in self._cases():
+            pod_name = AirflowKubernetesScheduler._create_pod_id(dag_id, task_id)
+            assert self._is_valid_name(pod_name)
+
+    @unittest.skipIf(AirflowKubernetesScheduler is None, "kubernetes python package is not installed")
+    def test_execution_date_serialize_deserialize(self):
+        datetime_obj = datetime.now()
+        serialized_datetime = AirflowKubernetesScheduler._datetime_to_label_safe_datestring(datetime_obj)
+        new_datetime_obj = AirflowKubernetesScheduler._label_safe_datestring_to_datetime(serialized_datetime)
+
+        assert datetime_obj == new_datetime_obj
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/tst.txt
----------------------------------------------------------------------
diff --git a/tst.txt b/tst.txt
deleted file mode 100644
index e69de29..0000000


Mime
View raw message