airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1343] Add Airflow default label to the dataproc operator
Date Tue, 27 Jun 2017 16:52:13 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 534a0e078 -> e4b240fb7


[AIRFLOW-1343] Add Airflow default label to the dataproc operator

Closes #2396 from XiangbingJi/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e4b240fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e4b240fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e4b240fb

Branch: refs/heads/master
Commit: e4b240fb72ed0e96ad742947efaec3216f597630
Parents: 534a0e0
Author: XiangbingJi <xiangbing@google.com>
Authored: Tue Jun 27 09:51:53 2017 -0700
Committer: Chris Riccomini <criccomini@apache.org>
Committed: Tue Jun 27 09:51:59 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/operators/dataproc_operator.py  |  6 +-
 .../contrib/operators/test_dataproc_operator.py | 89 ++++++++++++--------
 2 files changed, 58 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4b240fb/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 14dddb0..14245c8 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -19,6 +19,7 @@ import time
 from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
+from airflow.version import version
 from googleapiclient.errors import HttpError
 
 
@@ -226,8 +227,9 @@ class DataprocClusterCreateOperator(BaseOperator):
                 },
                 'isPreemptible': True
             }
-        if self.labels:
-            cluster_data['labels'] = self.labels
+
+        cluster_data['labels'] = self.labels if self.labels else {}
+        cluster_data['labels'].update({'airflow_version': version})
         if self.storage_bucket:
             cluster_data['config']['configBucket'] = self.storage_bucket
         if self.metadata:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4b240fb/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index 4d5e84b..a441e47 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -16,7 +16,9 @@
 import unittest
 
 from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator
+from airflow.version import version
 
+from copy import deepcopy
 
 TASK_ID = 'test-dataproc-operator'
 CLUSTER_NAME = 'test-cluster-name'
@@ -30,45 +32,62 @@ MASTER_DISK_SIZE = 100
 WORKER_MACHINE_TYPE = 'n1-standard-2'
 WORKER_DISK_SIZE = 100
 NUM_PREEMPTIBLE_WORKERS = 2
-
+LABEL1 = {}
+LABEL2 = {'application':'test', 'year': 2017} 
 
 class DataprocClusterCreateOperatorTest(unittest.TestCase):
-
+    # Unitest for the DataprocClusterCreateOperator
     def setUp(self):
-        self.dataproc = DataprocClusterCreateOperator(
-            task_id=TASK_ID,
-            cluster_name=CLUSTER_NAME,
-            project_id=PROJECT_ID,
-            num_workers=NUM_WORKERS,
-            zone=ZONE,
-            storage_bucket=STORAGE_BUCKET,
-            image_version=IMAGE_VERSION,
-            master_machine_type=MASTER_MACHINE_TYPE,
-            master_disk_size=MASTER_DISK_SIZE,
-            worker_machine_type=WORKER_MACHINE_TYPE,
-            worker_disk_size=WORKER_DISK_SIZE,
-            num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS)
+        # instantiate two different test cases with different labels 
+        self.labels = [LABEL1, LABEL2]
+        self.dataproc_operators = []
+        for labels in self.labels:
+             self.dataproc_operators.append(
+                DataprocClusterCreateOperator(
+                    task_id=TASK_ID,
+                    cluster_name=CLUSTER_NAME,
+                    project_id=PROJECT_ID,
+                    num_workers=NUM_WORKERS,
+                    zone=ZONE,
+                    storage_bucket=STORAGE_BUCKET,
+                    image_version=IMAGE_VERSION,
+                    master_machine_type=MASTER_MACHINE_TYPE,
+                    master_disk_size=MASTER_DISK_SIZE,
+                    worker_machine_type=WORKER_MACHINE_TYPE,
+                    worker_disk_size=WORKER_DISK_SIZE,
+                    num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS,
+                    labels = deepcopy(labels)
+                )
+             )
 
     def test_init(self):
-        """Test DataFlowPythonOperator instance is properly initialized."""
-        self.assertEqual(self.dataproc.cluster_name, CLUSTER_NAME)
-        self.assertEqual(self.dataproc.project_id, PROJECT_ID)
-        self.assertEqual(self.dataproc.num_workers, NUM_WORKERS)
-        self.assertEqual(self.dataproc.zone, ZONE)
-        self.assertEqual(self.dataproc.storage_bucket, STORAGE_BUCKET)
-        self.assertEqual(self.dataproc.image_version, IMAGE_VERSION)
-        self.assertEqual(self.dataproc.master_machine_type, MASTER_MACHINE_TYPE)
-        self.assertEqual(self.dataproc.master_disk_size, MASTER_DISK_SIZE)
-        self.assertEqual(self.dataproc.worker_machine_type, WORKER_MACHINE_TYPE)
-        self.assertEqual(self.dataproc.worker_disk_size, WORKER_DISK_SIZE)
-        self.assertEqual(self.dataproc.num_preemptible_workers, NUM_PREEMPTIBLE_WORKERS)
+        """Test DataProcClusterOperator instance is properly initialized."""
+        for suffix, dataproc_operator in enumerate(self.dataproc_operators):
+            self.assertEqual(dataproc_operator.cluster_name, CLUSTER_NAME)
+            self.assertEqual(dataproc_operator.project_id, PROJECT_ID)
+            self.assertEqual(dataproc_operator.num_workers, NUM_WORKERS)
+            self.assertEqual(dataproc_operator.zone, ZONE)
+            self.assertEqual(dataproc_operator.storage_bucket, STORAGE_BUCKET)
+            self.assertEqual(dataproc_operator.image_version, IMAGE_VERSION)
+            self.assertEqual(dataproc_operator.master_machine_type, MASTER_MACHINE_TYPE)
+            self.assertEqual(dataproc_operator.master_disk_size, MASTER_DISK_SIZE)
+            self.assertEqual(dataproc_operator.worker_machine_type, WORKER_MACHINE_TYPE)
+            self.assertEqual(dataproc_operator.worker_disk_size, WORKER_DISK_SIZE)
+            self.assertEqual(dataproc_operator.num_preemptible_workers, NUM_PREEMPTIBLE_WORKERS)
+            self.assertEqual(dataproc_operator.labels, self.labels[suffix])
 
     def test_build_cluster_data(self):
-        cluster_data = self.dataproc._build_cluster_data()
-        self.assertEqual(cluster_data['clusterName'], CLUSTER_NAME)
-        self.assertEqual(cluster_data['projectId'], PROJECT_ID)
-        self.assertEqual(cluster_data['config']['softwareConfig'], {'imageVersion': IMAGE_VERSION})
-        self.assertEqual(cluster_data['config']['configBucket'], STORAGE_BUCKET)
-        self.assertEqual(cluster_data['config']['workerConfig']['numInstances'], NUM_WORKERS)
-        self.assertEqual(cluster_data['config']['secondaryWorkerConfig']['numInstances'],
-                         NUM_PREEMPTIBLE_WORKERS)
+        for suffix, dataproc_operator in enumerate(self.dataproc_operators):
+            cluster_data = dataproc_operator._build_cluster_data()
+            self.assertEqual(cluster_data['clusterName'], CLUSTER_NAME)
+            self.assertEqual(cluster_data['projectId'], PROJECT_ID)
+            self.assertEqual(cluster_data['config']['softwareConfig'], {'imageVersion': IMAGE_VERSION})
+            self.assertEqual(cluster_data['config']['configBucket'], STORAGE_BUCKET)
+            self.assertEqual(cluster_data['config']['workerConfig']['numInstances'], NUM_WORKERS)
+            self.assertEqual(cluster_data['config']['secondaryWorkerConfig']['numInstances'],
+                             NUM_PREEMPTIBLE_WORKERS)
+            # test whether the default airflow_version label has been properly set to the
dataproc operator 
+            merged_labels = {}
+            merged_labels.update(self.labels[suffix])
+            merged_labels.update({'airflow_version': version})
+            self.assertEqual(cluster_data['labels'], merged_labels)


Mime
View raw message