airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject [13/20] incubator-airflow git commit: [AIRFLOW-1314] Small cleanup to address PR comments (#24)
Date Sun, 22 Apr 2018 08:32:46 GMT
[AIRFLOW-1314] Small cleanup to address PR comments (#24)

* Small cleanup to address PR comments

* Remove use of enum

* Change back to 3.4


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/309f764a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/309f764a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/309f764a

Branch: refs/heads/master
Commit: 309f764aa393a78303bd97b9fc2b985e93aac332
Parents: c0920ef
Author: Benjamin Goldberg <benjigoldberg@users.noreply.github.com>
Authored: Fri Oct 27 15:43:34 2017 -0500
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Sun Apr 22 10:23:06 2018 +0200

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 .../contrib/executors/kubernetes_executor.py    | 103 ++++++++++++-------
 airflow/contrib/kubernetes/kube_client.py       |   3 +-
 .../kubernetes_request_factory.py               |  16 +--
 .../pod_request_factory.py                      |   5 +-
 airflow/contrib/kubernetes/pod.py               |  24 ++---
 airflow/contrib/kubernetes/pod_launcher.py      |  21 ++--
 .../operators/kubernetes/pod_operator.py        |  38 +++----
 airflow/executors/base_executor.py              |   2 +-
 .../executors/test_kubernetes_executor.py       |  11 +-
 10 files changed, 131 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index ec2d44d..6fd2d50 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -86,7 +86,7 @@ matrix:
     - python: "2.7"
       env: TOX_ENV=py34-hdp-airflow_backend_postgres
     - python: "3.4"
-      env: TOX_ENV=py27-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true
+      env: TOX_ENV=py34-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true
   allow_failures:
     - env: TOX_ENV=py27-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true
 cache:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index 1e3e319..a5aa1e1 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -109,7 +109,8 @@ class KubeConfig:
             self.kubernetes_section, 'worker_container_tag')
         self.kube_image = '{}:{}'.format(
             self.worker_container_repository, self.worker_container_tag)
-        self.delete_worker_pods = self.safe_getboolean(self.kubernetes_section, 'delete_worker_pods',
True)
+        self.delete_worker_pods = self.safe_getboolean(
+            self.kubernetes_section, 'delete_worker_pods', True)
 
         self.worker_service_account_name = self.safe_get(
             self.kubernetes_section, 'worker_service_account_name', 'default')
@@ -132,7 +133,8 @@ class KubeConfig:
         self.dags_volume_claim = self.safe_get(self.kubernetes_section, 'dags_volume_claim',
None)
 
         # This prop may optionally be set for PV Claims and is used to locate DAGs on a SubPath
-        self.dags_volume_subpath = self.safe_get(self.kubernetes_section, 'dags_volume_subpath',
None)
+        self.dags_volume_subpath = self.safe_get(
+            self.kubernetes_section, 'dags_volume_subpath', None)
 
         # The Kubernetes Namespace in which the Scheduler and Webserver reside. Note that
if your
         # cluster has RBAC enabled, your scheduler may need service account permissions to
@@ -143,7 +145,8 @@ class KubeConfig:
         # interact with cluster components.
         self.executor_namespace = self.safe_get(self.kubernetes_section, 'namespace', 'default')
         # Task secrets managed by KubernetesExecutor.
-        self.gcp_service_account_keys = self.safe_get(self.kubernetes_section, 'gcp_service_account_keys',
None)
+        self.gcp_service_account_keys = self.safe_get(
+            self.kubernetes_section, 'gcp_service_account_keys', None)
 
         # If the user is using the git-sync container to clone their repository via git,
         # allow them to specify repository, tag, and pod name for the init container.
@@ -167,10 +170,8 @@ class KubeConfig:
     def _validate(self):
         if not self.dags_volume_claim and (not self.git_repo or not self.git_branch):
             raise AirflowConfigException(
-                "In kubernetes mode you must set the following configs in the `kubernetes`
section: "
-                "`dags_volume_claim` or "
-                "`git_repo and git_branch` "
-            )
+                "In kubernetes mode the following must be set in the `kubernetes` config
section: "
+                "`dags_volume_claim` or `git_repo and git_branch` ")
 
 
 class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
@@ -193,7 +194,9 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
                               "last resource_version: {}".format(self.resource_version))
 
     def _run(self, kube_client, resource_version):
-        self.log.info("Event: and now my watch begins starting at resource_version: {}".format(resource_version))
+        self.log.info(
+            "Event: and now my watch begins starting at resource_version: {}"
+            .format(resource_version))
         watcher = watch.Watch()
 
         kwargs = {"label_selector": "airflow-slave"}
@@ -203,9 +206,11 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
         last_resource_version = None
         for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, **kwargs):
             task = event['object']
-            self.log.info("Event: {} had an event of type {}".format(task.metadata.name,
event['type']))
+            self.log.info(
+                "Event: {} had an event of type {}".format(task.metadata.name, event['type']))
             self.process_status(
-                task.metadata.name, task.status.phase, task.metadata.labels, task.metadata.resource_version
+                task.metadata.name, task.status.phase, task.metadata.labels,
+                task.metadata.resource_version
             )
             last_resource_version = task.metadata.resource_version
 
@@ -224,7 +229,8 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
             self.log.info("Event: {} is Running".format(pod_id))
         else:
             self.log.warn("Event: Invalid state: {} on pod: {} with labels: {} "
-                             "with resource_version: {}".format(status, pod_id, labels, resource_version))
+                          "with resource_version: {}"
+                          .format(status, pod_id, labels, resource_version))
 
 
 class AirflowKubernetesScheduler(LoggingMixin, object):
@@ -252,7 +258,9 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
         if self.kube_watcher.is_alive():
             pass
         else:
-            self.log.error("Error while health checking kube watcher process. Process died
for unknown reasons")
+            self.log.error(
+                "Error while health checking kube watcher process. "
+                "Process died for unknown reasons")
             self.kube_watcher = self._make_kube_watcher()
 
     def run_next(self, next_job):
@@ -262,9 +270,6 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
         It will then create a unique job-id, launch that job in the cluster,
         and store relevent info in the current_jobs map so we can track the job's
         status
-
-        :return: 
-
         """
         self.log.info('k8s: job is {}'.format(str(next_job)))
         key, command, kube_executor_config = next_job
@@ -273,7 +278,8 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
         self.log.debug("k8s: launching image {}".format(self.kube_config.kube_image))
         pod = self.worker_configuration.make_pod(
             namespace=self.namespace, pod_id=self._create_pod_id(dag_id, task_id),
-            dag_id=dag_id, task_id=task_id, execution_date=self._datetime_to_label_safe_datestring(execution_date),
+            dag_id=dag_id, task_id=task_id, 
+            execution_date=self._datetime_to_label_safe_datestring(execution_date),
             airflow_command=command, kube_executor_config=kube_executor_config
         )
         # the watcher will monitor pods, so we do not block.
@@ -283,7 +289,8 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
     def delete_pod(self, pod_id):
         if self.kube_config.delete_worker_pods:
             try:
-                self.kube_client.delete_namespaced_pod(pod_id, self.namespace, body=client.V1DeleteOptions())
+                self.kube_client.delete_namespaced_pod(
+                    pod_id, self.namespace, body=client.V1DeleteOptions())
             except ApiException as e:
                 if e.status != 404:
                     raise
@@ -304,7 +311,9 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
 
     def process_watcher_task(self):
         pod_id, state, labels, resource_version = self.watcher_queue.get()
-        self.log.info("Attempting to finish pod; pod_id: {}; state: {}; labels: {}".format(pod_id,
state, labels))
+        self.log.info(
+            "Attempting to finish pod; pod_id: {}; state: {}; labels: {}"
+            .format(pod_id, state, labels))
         key = self._labels_to_key(labels)
         if key:
             self.log.debug("finishing job {} - {} ({})".format(key, state, pod_id))
@@ -314,10 +323,12 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
     def _strip_unsafe_kubernetes_special_chars(string):
         """
         Kubernetes only supports lowercase alphanumeric characters and "-" and "." in the
pod name
-        However, there are special rules about how "-" and "." can be used so let's only
keep alphanumeric chars
-        see here for detail: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
-        :param string:
-        :return:
+        However, there are special rules about how "-" and "." can be used so let's only
keep
+        alphanumeric chars  see here for detail:
+        https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
+
+        :param string: The requested Pod name
+        :return: ``str`` Pod name stripped of any unsafe characters
         """
         return ''.join(ch.lower() for ind, ch in enumerate(string) if ch.isalnum())
 
@@ -326,10 +337,11 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
         """
         Kubernetes pod names must be <= 253 chars and must pass the following regex for
validation
         "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
+
         :param safe_dag_id: a dag_id with only alphanumeric characters
         :param safe_task_id: a task_id with only alphanumeric characters
         :param random_uuid: a uuid
-        :return:
+        :return: ``str`` valid Pod name of appropriate length
         """
         MAX_POD_ID_LEN = 253
 
@@ -349,7 +361,9 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
     @staticmethod
     def _label_safe_datestring_to_datetime(string):
         """
-        Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but not
"_" let's replace ":" with "_"
+        Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not "_",
let's
+        replace ":" with "_"
+
         :param string: string
         :return: datetime.datetime object
         """
@@ -358,7 +372,8 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
     @staticmethod
     def _datetime_to_label_safe_datestring(datetime_obj):
         """
-        Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but not
"_" let's replace ":" with "_"
+        Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but not
"_" let's
+        replace ":" with "_"
         :param datetime_obj: datetime.datetime object
         :return: ISO-like string representing the datetime
         """
@@ -366,7 +381,9 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
 
     def _labels_to_key(self, labels):
         try:
-            return labels["dag_id"], labels["task_id"], self._label_safe_datestring_to_datetime(labels["execution_date"])
+            return (
+                labels["dag_id"], labels["task_id"],
+                self._label_safe_datestring_to_datetime(labels["execution_date"]))
         except Exception as e:
             self.log.warn("Error while converting labels to key; labels: {}; exception: {}".format(
                 labels, e
@@ -386,23 +403,32 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
 
     def clear_not_launched_queued_tasks(self):
         """
-        If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or may
not have been launched
-        Thus, on starting up the scheduler let's check every "Queued" task to see if it has
been launched
-            (ie: if there is a corresponding pod on kubernetes)
-        If it has been launched then do nothing, otherwise reset the state to "None" so the
task will be rescheduled
-        This will not be necessary in a future version of airflow in which there is proper
support for State.LAUNCHED
-        :return: None
+        If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or may
not
+        have been launched Thus, on starting up the scheduler let's check every "Queued"
task to
+        see if it has been launched (ie: if there is a corresponding pod on kubernetes)
+
+        If it has been launched then do nothing, otherwise reset the state to "None" so the
task
+        will be rescheduled
+
+        This will not be necessary in a future version of airflow in which there is proper
support
+        for State.LAUNCHED
         """
-        queued_tasks = self._session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED).all()
-        self.log.info("When executor started up, found {} queued task instances".format(len(queued_tasks)))
+        queued_tasks = self._session.query(
+            TaskInstance).filter(TaskInstance.state == State.QUEUED).all()
+        self.log.info(
+            "When executor started up, found {} queued task instances".format(len(queued_tasks)))
 
         for t in queued_tasks:
             kwargs = dict(label_selector="dag_id={},task_id={},execution_date={}".format(
-                t.dag_id, t.task_id, AirflowKubernetesScheduler._datetime_to_label_safe_datestring(t.execution_date)
+                t.dag_id, t.task_id,
+                AirflowKubernetesScheduler._datetime_to_label_safe_datestring(t.execution_date)
             ))
-            pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace,
**kwargs)
+            pod_list = self.kube_client.list_namespaced_pod(
+                self.kube_config.kube_namespace, **kwargs)
             if len(pod_list.items) == 0:
-                self.log.info("TaskInstance: {} found in queued state but was not launched,
rescheduling".format(t))
+                self.log.info(
+                    "TaskInstance: {} found in queued state but was not launched, rescheduling"
+                    .format(t))
                 self._session.query(TaskInstance).filter(
                     TaskInstance.dag_id == t.dag_id,
                     TaskInstance.task_id == t.task_id,
@@ -473,7 +499,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
             self.log.info("Changing state of {}".format(results))
             self._change_state(key, state, pod_id)
 
-        KubeResourceVersion.checkpoint_resource_version(last_resource_version, session=self._session)
+        KubeResourceVersion.checkpoint_resource_version(
+            last_resource_version, session=self._session)
 
         if not self.task_queue.empty():
             key, command, kube_executor_config = self.task_queue.get()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index b01e14d..9603963 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -22,4 +22,5 @@ def get_kube_client(in_cluster=True):
         config.load_incluster_config()
         return client.CoreV1Api()
     else:
-        NotImplementedError("Running kubernetes jobs from not within the cluster is not supported
at this time")
+        NotImplementedError(
+            "Running kubernetes jobs from not within the cluster is not supported at this
time")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 67ff15c..b5ab074 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -17,19 +17,18 @@ import six
 
 class KubernetesRequestFactory():
     """
-        Create requests to be sent to kube API. Extend this class
-        to talk to kubernetes and generate your specific resources.
-        This is equivalent of generating yaml files that can be used
-        by `kubectl`
+    Create requests to be sent to kube API. Extend this class to talk to kubernetes and generate
+    your specific resources. This is equivalent of generating yaml files that can be used
by
+    `kubectl`
     """
     __metaclass__ = ABCMeta
 
     @abstractmethod
     def create(self, pod):
         """
-            Creates the request for kubernetes API.
+        Creates the request for kubernetes API.
 
-            :param pod: The pod object
+        :param pod: The pod object
         """
         pass
 
@@ -89,7 +88,10 @@ class KubernetesRequestFactoryHelper(object):
 
     @staticmethod
     def attach_volume_mounts(pod, req):
-        req['spec']['volumes'] = pod.volumes
+        if len(pod.volume_mounts) > 0:
+            req['spec']['containers'][0]['volumeMounts'] = (
+                req['spec']['containers'][0].get('volumeMounts', []))
+            req['spec']['containers'][0]['volumeMounts'].extend(pod.volume_mounts)
 
     @staticmethod
     def extract_name(pod, req):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 2b1756a..ea6b94b 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -12,13 +12,14 @@
 # See the License for the specific language governing permissions and
 
 import yaml
-from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory import
KubernetesRequestFactory
+from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory import
(
+    KubernetesRequestFactory)
 from airflow.contrib.kubernetes.pod import Pod
 
 
 class SimplePodRequestFactory(KubernetesRequestFactory):
     """
-        Request generator for a simple pod.
+    Request generator for a simple pod.
     """
     _yaml = """apiVersion: v1
 kind: Pod

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index 56a3114..1877da7 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -32,19 +32,19 @@ class Resources:
 
 class Pod:
     """
-        Represents a kubernetes pod and manages execution of a single pod.
-        :param image: The docker image
-        :type image: str
-        :param env: A dict containing the environment variables
-        :type env: dict
-        :param cmds: The command to be run on the pod
-        :type cmd: list str
-        :param secrets: Secrets to be launched to the pod
-        :type secrets: list Secret
-        :param result: The result that will be returned to the operator after
-                       successful execution of the pod
-        :type result: any
+    Represents a kubernetes pod and manages execution of a single pod.
 
+    :param image: The docker image
+    :type image: str
+    :param env: A dict containing the environment variables
+    :type env: dict
+    :param cmds: The command to be run on the pod
+    :type cmd: list str
+    :param secrets: Secrets to be launched to the pod
+    :type secrets: list Secret
+    :param result: The result that will be returned to the operator after
+                   successful execution of the pod
+    :type result: any
     """
     pod_timeout = 3600
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py
index 1fcdb10..1903060 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -11,10 +11,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 import json
 
 from airflow.contrib.kubernetes.pod import Pod
-from airflow.contrib.kubernetes.kubernetes_request_factory.pod_request_factory import SimplePodRequestFactory
+from airflow.contrib.kubernetes.kubernetes_request_factory.pod_request_factory import (
+    SimplePodRequestFactory)
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 from kubernetes import watch
@@ -24,6 +26,13 @@ from kubernetes.client.rest import ApiException
 from .kube_client import get_kube_client
 
 
+class PodStatus(object):
+    PENDING = 'pending'
+    RUNNING = 'running'
+    FAILED = 'failed'
+    SUCCEEDED = 'succeeded'
+
+
 class PodLauncher(LoggingMixin):
     def __init__(self, kube_client=None):
         self.kube_req_factory = SimplePodRequestFactory()
@@ -44,7 +53,7 @@ class PodLauncher(LoggingMixin):
     def run_pod(self, pod):
         # type: (Pod) -> State
         """
-            Launches the pod synchronously and waits for completion.
+        Launches the pod synchronously and waits for completion.
         """
         resp = self.run_pod_async(pod)
         final_status = self._monitor_pod(pod)
@@ -70,15 +79,15 @@ class PodLauncher(LoggingMixin):
         return self._client.read_namespaced_pod(pod.name, pod.namespace)
 
     def process_status(self, job_id, status):
-        if status == 'Pending':
+        if status == PodStatus.PENDING:
             return State.QUEUED
-        elif status == 'Failed':
+        elif status == PodStatus.FAILED:
             self.log.info("Event: {} Failed".format(job_id))
             return State.FAILED
-        elif status == 'Succeeded':
+        elif status == PodStatus.SUCCEEDED:
             self.log.info("Event: {} Succeeded".format(job_id))
             return State.SUCCESS
-        elif status == 'Running':
+        elif status == PodStatus.RUNNING:
             return State.RUNNING
         else:
             self.log.info("Event: Invalid state {} on job {}".format(status, job_id))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/contrib/operators/kubernetes/pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes/pod_operator.py b/airflow/contrib/operators/kubernetes/pod_operator.py
index 8b7a55f..0db8c6d 100644
--- a/airflow/contrib/operators/kubernetes/pod_operator.py
+++ b/airflow/contrib/operators/kubernetes/pod_operator.py
@@ -12,8 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
-
 from airflow.exceptions import AirflowException
 from airflow.operators.python_operator import PythonOperator
 from airflow.utils.decorators import apply_defaults
@@ -24,16 +22,16 @@ from airflow.utils.state import State
 
 class PodOperator(PythonOperator):
     """
-        Executes a pod and waits for the job to finish.
-        :param dag_run_id: The unique run ID that would be attached to the pod as a label
-        :type dag_run_id: str
-        :param pod_factory: Reference to the function that creates the pod with format:
-                            function (OpContext) => Pod
-        :type pod_factory: callable
-        :param cache_output: If set to true, the output of the pod would be saved in a
-                            cache object using md5 hash of all the pod parameters
-                            and in case of success, the cached results will be returned
-                            on consecutive calls. Only use this
+    Executes a pod and waits for the job to finish.
+
+    :param dag_run_id: The unique run ID that would be attached to the pod as a label
+    :type dag_run_id: str
+    :param pod_factory: Reference to the function that creates the pod with format:
+        function (OpContext) => Pod
+    :type pod_factory: callable
+    :param cache_output: If set to true, the output of the pod would be saved in a
+        cache object using md5 hash of all the pod parameters and in case of success, the
cached
+        results will be returned on consecutive calls. Only use this
     """
     # template_fields = tuple('dag_run_id')
     ui_color = '#8da7be'
@@ -56,7 +54,6 @@ class PodOperator(PythonOperator):
             provide_context=True,
             *args,
             **kwargs)
-        self.logger = logging.getLogger(self.__class__.__name__)
         self.pod = pod
         self.dag_run_id = dag_run_id
         self.pod_launcher = PodLauncher()
@@ -92,13 +89,12 @@ class PodOperator(PythonOperator):
 
     def on_pod_success(self, context):
         """
-            Called when pod is executed successfully.
-            
-            If you want to access return values for XCOM, place values
-            in accessible file system or DB and override this function.
-            
-            :return: Returns a custom return value for pod which will
-                     be stored in xcom
-                     
+        Called when pod is executed successfully.
+
+        If you want to access return values for XCOM, place values
+        in accessible file system or DB and override this function.
+
+        :return: Returns a custom return value for pod which will
+            be stored in xcom
         """
         return self._on_pod_success_func(context=context)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 1ff4c21..7f00e93 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -53,7 +53,7 @@ class BaseExecutor(LoggingMixin):
             self.log.info("Adding to queue: %s", command)
             self.queued_tasks[key] = (command, priority, queue, task_instance)
         else:
-            self.logger.info("could not queue task {}".format(key))
+            self.log.info("could not queue task {}".format(key))
 
     def queue_task_instance(
             self,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/tests/contrib/executors/test_kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py
index a60489e..4c9728e 100644
--- a/tests/contrib/executors/test_kubernetes_executor.py
+++ b/tests/contrib/executors/test_kubernetes_executor.py
@@ -48,15 +48,16 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
 
     def _is_valid_name(self, name):
         regex = "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
-        return len(name) <= 253 and \
-               all(ch.lower() == ch for ch in name) and \
-               re.match(regex, name)
+        return (
+            len(name) <= 253 and
+            all(ch.lower() == ch for ch in name) and
+            re.match(regex, name))
 
     @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not
installed')
     def test_create_pod_id(self):
         for dag_id, task_id in self._cases():
             pod_name = AirflowKubernetesScheduler._create_pod_id(dag_id, task_id)
-            assert self._is_valid_name(pod_name)
+            self.assertTrue(self._is_valid_name(pod_name))
 
     @unittest.skipIf(AirflowKubernetesScheduler is None, "kubernetes python package is not
installed")
     def test_execution_date_serialize_deserialize(self):
@@ -64,7 +65,7 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
         serialized_datetime = AirflowKubernetesScheduler._datetime_to_label_safe_datestring(datetime_obj)
         new_datetime_obj = AirflowKubernetesScheduler._label_safe_datestring_to_datetime(serialized_datetime)
 
-        assert datetime_obj == new_datetime_obj
+        self.assertEquals(datetime_obj, new_datetime_obj)
 
 
 if __name__ == '__main__':


Mime
View raw message