airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-2424] Add dagrun status endpoint and increased k8s test coverage
Date Thu, 10 May 2018 17:32:25 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 74027c9a6 -> 5de22d7fa


[AIRFLOW-2424] Add dagrun status endpoint and increased k8s test coverage

[AIRFLOW-2424] Add dagrun status endpoint and
increase k8s test coverage

[AIRFLOW-2424] Added minikube fixes by @kimoonkim

[AIRFLOW-2424] modify endpoint to remove 'status'

Closes #3320 from dimberman/add-kubernetes-test


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5de22d7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5de22d7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5de22d7f

Branch: refs/heads/master
Commit: 5de22d7fa0d8bc6b9267ea13579b5ac5f62c8bb5
Parents: 74027c9
Author: Daniel Imberman <danielryan2430@gmail.com>
Authored: Thu May 10 19:32:17 2018 +0200
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Thu May 10 19:32:17 2018 +0200

----------------------------------------------------------------------
 .travis.yml                                     |   3 +
 .../common/experimental/get_dag_run_state.py    |  44 +++++
 .../contrib/kubernetes/worker_configuration.py  |   1 -
 airflow/www/api/experimental/endpoints.py       |   4 +-
 airflow/www_rbac/api/experimental/endpoints.py  |  37 ++++
 scripts/ci/kubernetes/docker/airflow-init.sh    |   2 +-
 scripts/ci/kubernetes/kube/deploy.sh            |   4 +-
 scripts/ci/kubernetes/minikube/_k8s.sh          |  69 ++++++++
 .../ci/kubernetes/minikube/start_minikube.sh    |  25 ++-
 .../minikube/test_kubernetes_executor.py        | 167 +++++++++++++++----
 .../www_rbac/api/experimental/test_endpoints.py |  42 +++++
 11 files changed, 359 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5de22d7f/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 87c7066..e316256 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -101,6 +101,9 @@ before_script:
   - sudo service mysql restart
   - psql -c 'create database airflow;' -U postgres
   - export PATH=${PATH}:/tmp/hive/bin
+  # Required for K8s v1.10.x. See
+  # https://github.com/kubernetes/kubernetes/issues/61058#issuecomment-372764783
+  - sudo mount --make-shared / && sudo service docker restart
 script:
   - ./scripts/ci/travis_script.sh
 after_success:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5de22d7f/airflow/api/common/experimental/get_dag_run_state.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/get_dag_run_state.py b/airflow/api/common/experimental/get_dag_run_state.py
new file mode 100644
index 0000000..a7bd131
--- /dev/null
+++ b/airflow/api/common/experimental/get_dag_run_state.py
@@ -0,0 +1,44 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.models import DagBag
+
+
+def get_dag_run_state(dag_id, execution_date):
+    """Return the task object identified by the given dag_id and task_id."""
+
+    dagbag = DagBag()
+
+    # Check DAG exists.
+    if dag_id not in dagbag.dags:
+        error_message = "Dag id {} not found".format(dag_id)
+        raise AirflowException(error_message)
+
+    # Get DAG object and check Task Exists
+    dag = dagbag.get_dag(dag_id)
+
+    # Get DagRun object and check that it exists
+    dagrun = dag.get_dagrun(execution_date=execution_date)
+    if not dagrun:
+        error_message = ('Dag Run for date {} not found in dag {}'
+                         .format(execution_date, dag_id))
+        raise AirflowException(error_message)
+
+    return {'state': dagrun.get_state()}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5de22d7f/airflow/contrib/kubernetes/worker_configuration.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py
index ac4dacf..89f0902 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -177,7 +177,6 @@ class WorkerConfiguration(LoggingMixin):
         annotations = {
             'iam.cloud.google.com/service-account': gcp_sa_key
         } if gcp_sa_key else {}
-        airflow_command = airflow_command.replace("-sd", "-i -sd")
         airflow_path = airflow_command.split('-sd')[-1]
         airflow_path = self.worker_airflow_home + airflow_path.split('/')[-1]
         airflow_command = airflow_command.split('-sd')[0] + '-sd ' + airflow_path

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5de22d7f/airflow/www/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py
index 9cd0af8..1778000 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5de22d7f/airflow/www_rbac/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/api/experimental/endpoints.py b/airflow/www_rbac/api/experimental/endpoints.py
index bddf0c1..693feec 100644
--- a/airflow/www_rbac/api/experimental/endpoints.py
+++ b/airflow/www_rbac/api/experimental/endpoints.py
@@ -22,6 +22,7 @@ from airflow.api.common.experimental import pool as pool_api
 from airflow.api.common.experimental import trigger_dag as trigger
 from airflow.api.common.experimental.get_task import get_task
 from airflow.api.common.experimental.get_task_instance import get_task_instance
+from airflow.api.common.experimental.get_dag_run_state import get_dag_run_state
 from airflow.exceptions import AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils import timezone
@@ -176,6 +177,42 @@ def task_instance_info(dag_id, execution_date, task_id):
     return jsonify(fields)
 
 
+@api_experimental.route(
+    '/dags/<string:dag_id>/dag_runs/<string:execution_date>',
+    methods=['GET'])
+@requires_authentication
+def dag_run_status(dag_id, execution_date):
+    """
+    Returns a JSON with a dag_run's public instance variables.
+    The format for the exec_date is expected to be
+    "YYYY-mm-DDTHH:MM:SS", for example: "2016-11-16T11:34:15". This will
+    of course need to have been encoded for URL in the request.
+    """
+
+    # Convert string datetime into actual datetime
+    try:
+        execution_date = timezone.parse(execution_date)
+    except ValueError:
+        error_message = (
+            'Given execution date, {}, could not be identified '
+            'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(
+                execution_date))
+        _log.info(error_message)
+        response = jsonify({'error': error_message})
+        response.status_code = 400
+
+        return response
+
+    try:
+        info = get_dag_run_state(dag_id, execution_date)
+    except AirflowException as err:
+        _log.info(err)
+        response = jsonify(error="{}".format(err))
+        response.status_code = 404
+        return response
+
+    return jsonify(info)
+
 @api_experimental.route('/latest_runs', methods=['GET'])
 @requires_authentication
 def latest_dag_runs():

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5de22d7f/scripts/ci/kubernetes/docker/airflow-init.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/airflow-init.sh b/scripts/ci/kubernetes/docker/airflow-init.sh
index dc33625..cbd1c98 100755
--- a/scripts/ci/kubernetes/docker/airflow-init.sh
+++ b/scripts/ci/kubernetes/docker/airflow-init.sh
@@ -21,4 +21,4 @@ cd /usr/local/lib/python2.7/dist-packages/airflow && \
 cp -R example_dags/* /root/airflow/dags/ && \
 airflow initdb && \
 alembic upgrade heads && \
-airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow
+(airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow
|| true)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5de22d7f/scripts/ci/kubernetes/kube/deploy.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/deploy.sh b/scripts/ci/kubernetes/kube/deploy.sh
index e585d87..a9a42a7 100755
--- a/scripts/ci/kubernetes/kube/deploy.sh
+++ b/scripts/ci/kubernetes/kube/deploy.sh
@@ -25,10 +25,10 @@ kubectl delete -f $DIRNAME/postgres.yaml
 kubectl delete -f $DIRNAME/airflow.yaml
 kubectl delete -f $DIRNAME/secrets.yaml
 
-kubectl apply -f $DIRNAME/postgres.yaml
-kubectl apply -f $DIRNAME/volumes.yaml
 kubectl apply -f $DIRNAME/secrets.yaml
 kubectl apply -f $DIRNAME/configmaps.yaml
+kubectl apply -f $DIRNAME/postgres.yaml
+kubectl apply -f $DIRNAME/volumes.yaml
 kubectl apply -f $DIRNAME/airflow.yaml
 
 # wait for up to 10 minutes for everything to be deployed

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5de22d7f/scripts/ci/kubernetes/minikube/_k8s.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/_k8s.sh b/scripts/ci/kubernetes/minikube/_k8s.sh
new file mode 100644
index 0000000..6ce9d31
--- /dev/null
+++ b/scripts/ci/kubernetes/minikube/_k8s.sh
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This script was based on one made by @kimoonkim for kubernetes-hdfs
+
+# Helper bash functions.
+
+# Wait for Kubernetes resources to be up and ready.
+function _wait_for_ready () {
+  local count="$1"
+  shift
+  local evidence="$1"
+  shift
+  local attempts=40
+  echo "Waiting till ready (count: $count): $@"
+  while [[ "$count" != $("$@" 2>&1 | tail -n +2 | grep -c $evidence) ]];
+  do
+    if [[ "$attempts" = "1" ]]; then
+      echo "Last run: $@"
+      "$@" || true
+      local command="$@"
+      command="${command/get/describe}"
+      $command || true
+    fi
+    ((attempts--)) || return 1
+    sleep 5
+  done
+  "$@" || true
+}
+
+# Wait for all expected number of nodes to be ready
+function k8s_all_nodes_ready () {
+  local count="$1"
+  shift
+  _wait_for_ready "$count" "-v NotReady" kubectl get nodes
+  _wait_for_ready "$count" Ready kubectl get nodes
+}
+
+function k8s_single_node_ready () {
+  k8s_all_nodes_ready 1
+}
+
+# Wait for all expected number of pods to be ready. This works only for
+# pods with up to 4 containers, as we check "1/1" to "4/4" in
+# `kubectl get pods` output.
+function k8s_all_pods_ready () {
+  local count="$1"
+  shift
+  local evidence="-e 1/1 -e 2/2 -e 3/3 -e 4/4"
+  _wait_for_ready "$count" "$evidence" kubectl get pods "$@"
+}
+
+function k8s_single_pod_ready () {
+  k8s_all_pods_ready 1 "$@"
+}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5de22d7f/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
index 7e50c23..5171a26 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -19,6 +19,7 @@
 
 #!/usr/bin/env bash
 
+
 _MY_SCRIPT="${BASH_SOURCE[0]}"
 _MY_DIR=$(cd "$(dirname "$_MY_SCRIPT")" && pwd)
 # Avoids 1.7.x because of https://github.com/kubernetes/minikube/issues/2240
@@ -26,7 +27,7 @@ _KUBERNETES_VERSION="${KUBERNETES_VERSION}"
 
 echo "setting up kubernetes ${_KUBERNETES_VERSION}"
 
-_MINIKUBE_VERSION="v0.25.2"
+_MINIKUBE_VERSION="v0.26.0"
 _HELM_VERSION=v2.8.1
 _VM_DRIVER=none
 USE_MINIKUBE_DRIVER_NONE=true
@@ -45,6 +46,8 @@ export CHANGE_MINIKUBE_NONE_USER=true
 
 cd $_MY_DIR
 
+source _k8s.sh
+
 rm -rf tmp
 mkdir -p bin tmp
 
@@ -104,3 +107,23 @@ _MINIKUBE="sudo PATH=$PATH minikube"
 $_MINIKUBE config set bootstrapper localkube
 $_MINIKUBE start --kubernetes-version=${_KUBERNETES_VERSION}  --vm-driver=none
 $_MINIKUBE update-context
+
+# Wait for Kubernetes to be up and ready.
+k8s_single_node_ready
+
+echo Minikube addons:
+$_MINIKUBE addons list
+kubectl get storageclass
+echo Showing kube-system pods
+kubectl get -n kube-system pods
+
+(k8s_single_pod_ready -n kube-system -l component=kube-addon-manager) ||
+  (_ADDON=$(kubectl get pod -n kube-system -l component=kube-addon-manager
+      --no-headers -o name| cut -d/ -f2);
+   echo Addon-manager describe:;
+   kubectl describe pod -n kube-system $_ADDON;
+   echo Addon-manager log:;
+   kubectl logs -n kube-system $_ADDON;
+   exit 1)
+k8s_single_pod_ready -n kube-system -l k8s-app=kube-dns
+k8s_single_pod_ready -n kube-system storage-provisioner

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5de22d7f/tests/contrib/minikube/test_kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_executor.py b/tests/contrib/minikube/test_kubernetes_executor.py
index 9827bc8..5c4617f 100644
--- a/tests/contrib/minikube/test_kubernetes_executor.py
+++ b/tests/contrib/minikube/test_kubernetes_executor.py
@@ -18,10 +18,11 @@
 
 import unittest
 from subprocess import check_call, check_output
-
+import requests.exceptions
 import requests
 import time
 import six
+import re
 
 try:
     check_call(["kubectl", "get", "pods"])
@@ -32,65 +33,167 @@ except Exception as e:
     )
 
 
+def get_minikube_host():
+    host_ip = check_output(['minikube', 'ip'])
+    if six.PY3:
+        host_ip = host_ip.decode('UTF-8')
+    host = '{}:30809'.format(host_ip.strip())
+    return host
+
+
 class KubernetesExecutorTest(unittest.TestCase):
+    def _delete_airflow_pod(self):
+        air_pod = check_output(['kubectl', 'get', 'pods']).decode()
+        air_pod = air_pod.split('\n')
+        names = [re.compile('\s+').split(x)[0] for x in air_pod if 'airflow' in x]
+        if names:
+            check_call(['kubectl', 'delete', 'pod', names[0]])
+
+    def monitor_task(self, host, execution_date, dag_id, task_id, expected_final_state,
+                     timeout):
+        tries = 0
+        state = ''
+        max_tries = max(int(timeout / 5), 1)
+        # Wait 100 seconds for the operator to complete
+        while tries < max_tries:
+            time.sleep(5)
 
-    def test_integration_run_dag(self):
-        host_ip = check_output(['minikube', 'ip'])
-        if six.PY3:
-            host_ip = host_ip.decode('UTF-8')
-        host = '{}:30809'.format(host_ip.strip())
+            # Trigger a new dagrun
+            try:
+                result = requests.get(
+                    'http://{host}/api/experimental/dags/{dag_id}/'
+                    'dag_runs/{execution_date}/tasks/{task_id}'
+                    .format(host=host,
+                            dag_id=dag_id,
+                            execution_date=execution_date,
+                            task_id=task_id)
+                )
+                self.assertEqual(result.status_code, 200, "Could not get the status")
+                result_json = result.json()
+                state = result_json['state']
+                print("Attempt {}: Current state of operator is {}".format(tries, state))
+
+                if state == expected_final_state:
+                    break
+                tries += 1
+            except requests.exceptions.ConnectionError as e:
+                check_call(["echo", "api call failed. trying again. error {}".format(e)])
+                pass
+
+        self.assertEqual(state, expected_final_state)
+
+        # Maybe check if we can retrieve the logs, but then we need to extend the API
+
+    def ensure_dag_expected_state(self, host, execution_date, dag_id,
+                                  expected_final_state,
+                                  timeout):
+        tries = 0
+        state = ''
+        max_tries = max(int(timeout / 5), 1)
+        # Wait 100 seconds for the operator to complete
+        while tries < max_tries:
+            time.sleep(5)
+
+            # Trigger a new dagrun
+            result = requests.get(
+                'http://{host}/api/experimental/dags/{dag_id}/'
+                'dag_runs/{execution_date}'
+                .format(host=host,
+                        dag_id=dag_id,
+                        execution_date=execution_date)
+            )
+            print(result)
+            self.assertEqual(result.status_code, 200, "Could not get the status")
+            result_json = result.json()
+            print(result_json)
+            state = result_json['state']
+            check_call(
+                ["echo", "Attempt {}: Current state of dag is {}".format(tries, state)])
+            print("Attempt {}: Current state of dag is {}".format(tries, state))
+
+            if state == expected_final_state:
+                break
+            tries += 1
 
-        #  Enable the dag
+        self.assertEqual(state, expected_final_state)
+
+        # Maybe check if we can retrieve the logs, but then we need to extend the API
+
+    def start_dag(self, dag_id, host):
         result = requests.get(
-            'http://{}/api/experimental/'
-            'dags/example_python_operator/paused/false'.format(host)
+            'http://{host}/api/experimental/'
+            'dags/{dag_id}/paused/false'.format(host=host, dag_id=dag_id)
         )
-        self.assertEqual(result.status_code, 200, "Could not enable DAG")
+        self.assertEqual(result.status_code, 200, "Could not enable DAG: {result}"
+                         .format(result=result.json()))
 
         # Trigger a new dagrun
         result = requests.post(
-            'http://{}/api/experimental/'
-            'dags/example_python_operator/dag_runs'.format(host),
+            'http://{host}/api/experimental/'
+            'dags/{dag_id}/dag_runs'.format(host=host, dag_id=dag_id),
             json={}
         )
-        self.assertEqual(result.status_code, 200, "Could not trigger a DAG-run")
+        self.assertEqual(result.status_code, 200, "Could not trigger a DAG-run: {result}"
+                         .format(result=result.json()))
 
         time.sleep(1)
 
         result = requests.get(
             'http://{}/api/experimental/latest_runs'.format(host)
         )
-        self.assertEqual(result.status_code, 200, "Could not get the latest DAG-run")
+        self.assertEqual(result.status_code, 200, "Could not get the latest DAG-run:"
+                                                  " {result}"
+                         .format(result=result.json()))
         result_json = result.json()
+        return result_json
+
+    def test_integration_run_dag(self):
+        host = get_minikube_host()
+
+        result_json = self.start_dag(dag_id='example_python_operator', host=host)
 
         self.assertGreater(len(result_json['items']), 0)
 
         execution_date = result_json['items'][0]['execution_date']
         print("Found the job with execution date {}".format(execution_date))
 
-        tries = 0
-        state = ''
         # Wait 100 seconds for the operator to complete
-        while tries < 20:
-            time.sleep(5)
+        self.monitor_task(host=host,
+                          execution_date=execution_date,
+                          dag_id='example_python_operator',
+                          task_id='print_the_context',
+                          expected_final_state='success', timeout=100)
 
-            # Trigger a new dagrun
-            result = requests.get(
-                'http://{}/api/experimental/dags/example_python_operator/'
-                'dag_runs/{}/tasks/print_the_context'.format(host, execution_date)
-            )
-            self.assertEqual(result.status_code, 200, "Could not get the status")
-            result_json = result.json()
-            state = result_json['state']
-            print("Attempt {}: Current state of operator is {}".format(tries, state))
+        self.ensure_dag_expected_state(host=host,
+                                       execution_date=execution_date,
+                                       dag_id='example_python_operator',
+                                       expected_final_state='success', timeout=100)
 
-            if state == 'success':
-                break
-            tries += 1
+    def test_integration_run_dag_with_scheduler_failure(self):
+        host = get_minikube_host()
 
-        self.assertEqual(state, 'success')
+        result_json = self.start_dag(dag_id='example_python_operator', host=host)
 
-        # Maybe check if we can retrieve the logs, but then we need to extend the API
+        self.assertGreater(len(result_json['items']), 0)
+
+        execution_date = result_json['items'][0]['execution_date']
+        print("Found the job with execution date {}".format(execution_date))
+
+        self._delete_airflow_pod()
+
+        time.sleep(10)  # give time for pod to restart
+
+        # Wait 100 seconds for the operator to complete
+        self.monitor_task(host=host,
+                          execution_date=execution_date,
+                          dag_id='example_python_operator',
+                          task_id='print_the_context',
+                          expected_final_state='success', timeout=100)
+
+        self.ensure_dag_expected_state(host=host,
+                                       execution_date=execution_date,
+                                       dag_id='example_python_operator',
+                                       expected_final_state='success', timeout=100)
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5de22d7f/tests/www_rbac/api/experimental/test_endpoints.py
----------------------------------------------------------------------
diff --git a/tests/www_rbac/api/experimental/test_endpoints.py b/tests/www_rbac/api/experimental/test_endpoints.py
index a19492e..a84d9cf 100644
--- a/tests/www_rbac/api/experimental/test_endpoints.py
+++ b/tests/www_rbac/api/experimental/test_endpoints.py
@@ -204,6 +204,48 @@ class TestApiExperimental(unittest.TestCase):
         self.assertEqual(400, response.status_code)
         self.assertIn('error', response.data.decode('utf-8'))
 
+    def test_dagrun_status(self):
+        url_template = '/api/experimental/dags/{}/dag_runs/{}'
+        dag_id = 'example_bash_operator'
+        execution_date = utcnow().replace(microsecond=0)
+        datetime_string = quote_plus(execution_date.isoformat())
+        wrong_datetime_string = quote_plus(
+            datetime(1990, 1, 1, 1, 1, 1).isoformat()
+        )
+
+        # Create DagRun
+        trigger_dag(dag_id=dag_id,
+                    run_id='test_task_instance_info_run',
+                    execution_date=execution_date)
+
+        # Test Correct execution
+        response = self.app.get(
+            url_template.format(dag_id, datetime_string)
+        )
+        self.assertEqual(200, response.status_code)
+        self.assertIn('state', response.data.decode('utf-8'))
+        self.assertNotIn('error', response.data.decode('utf-8'))
+
+        # Test error for nonexistent dag
+        response = self.app.get(
+            url_template.format('does_not_exist_dag', datetime_string),
+        )
+        self.assertEqual(404, response.status_code)
+        self.assertIn('error', response.data.decode('utf-8'))
+
+        # Test error for nonexistent dag run (wrong execution_date)
+        response = self.app.get(
+            url_template.format(dag_id, wrong_datetime_string)
+        )
+        self.assertEqual(404, response.status_code)
+        self.assertIn('error', response.data.decode('utf-8'))
+
+        # Test error for bad datetime format
+        response = self.app.get(
+            url_template.format(dag_id, 'not_a_datetime')
+        )
+        self.assertEqual(400, response.status_code)
+        self.assertIn('error', response.data.decode('utf-8'))
 
 class TestPoolApiExperimental(unittest.TestCase):
 


Mime
View raw message