airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1535] Add service account/scopes in dataproc
Date Wed, 30 Aug 2017 16:13:58 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 5c8075526 -> b1f902e63


[AIRFLOW-1535] Add service account/scopes in dataproc

Closes #2546 from fenglu-g/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b1f902e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b1f902e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b1f902e6

Branch: refs/heads/master
Commit: b1f902e63e93e8e3c275c501a13987d658dd268f
Parents: 5c80755
Author: fenglu-g <fenglu@google.com>
Authored: Wed Aug 30 09:13:43 2017 -0700
Committer: Chris Riccomini <criccomini@apache.org>
Committed: Wed Aug 30 09:13:43 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/operators/dataproc_operator.py    | 14 ++++++++++++++
 tests/contrib/operators/test_dataproc_operator.py | 10 +++++++++-
 2 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b1f902e6/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index aa9b335..c0ff6a7 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -58,6 +58,8 @@ class DataprocClusterCreateOperator(BaseOperator):
                  region='global',
                  google_cloud_conn_id='google_cloud_default',
                  delegate_to=None,
+                 service_account=None,
+                 service_account_scopes=None,
                  *args,
                  **kwargs):
         """
@@ -111,6 +113,10 @@ class DataprocClusterCreateOperator(BaseOperator):
             For this to work, the service account making the request must have domain-wide
             delegation enabled.
         :type delegate_to: string
+        :param service_account: The service account of the dataproc instances.
+        :type service_account: string
+        :param service_account_scopes: The URIs of service account scopes to be included.
+        :type service_account_scopes: list[string]
         """
         super(DataprocClusterCreateOperator, self).__init__(*args, **kwargs)
         self.google_cloud_conn_id = google_cloud_conn_id
@@ -131,6 +137,8 @@ class DataprocClusterCreateOperator(BaseOperator):
         self.labels = labels
         self.zone = zone
         self.region = region
+        self.service_account = service_account
+        self.service_account_scopes = service_account_scopes
 
     def _get_cluster_list_for_project(self, service):
         result = service.projects().regions().clusters().list(
@@ -247,6 +255,12 @@ class DataprocClusterCreateOperator(BaseOperator):
                 {'executableFile': uri} for uri in self.init_actions_uris
             ]
             cluster_data['config']['initializationActions'] = init_actions_dict
+        if self.service_account:
+            cluster_data['config']['gceClusterConfig']['serviceAccount'] =\
+                    self.service_account
+        if self.service_account_scopes:
+            cluster_data['config']['gceClusterConfig']['serviceAccountScopes'] =\
+                    self.service_account_scopes
         return cluster_data
 
     def execute(self, context):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b1f902e6/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index a0c6ba0..71edf58 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -41,6 +41,10 @@ WORKER_DISK_SIZE = 100
 NUM_PREEMPTIBLE_WORKERS = 2
 LABEL1 = {}
 LABEL2 = {'application':'test', 'year': 2017}
+SERVICE_ACCOUNT_SCOPES = [
+    'https://www.googleapis.com/auth/bigquery',
+    'https://www.googleapis.com/auth/bigtable.data'
+]
 DEFAULT_DATE = datetime.datetime(2017, 6, 6)
 
 class DataprocClusterCreateOperatorTest(unittest.TestCase):
@@ -64,7 +68,8 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                     worker_machine_type=WORKER_MACHINE_TYPE,
                     worker_disk_size=WORKER_DISK_SIZE,
                     num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS,
-                    labels = deepcopy(labels)
+                    labels = deepcopy(labels),
+                    service_account_scopes = SERVICE_ACCOUNT_SCOPES
                 )
              )
         self.dag = DAG(
@@ -91,6 +96,7 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
             self.assertEqual(dataproc_operator.worker_disk_size, WORKER_DISK_SIZE)
             self.assertEqual(dataproc_operator.num_preemptible_workers, NUM_PREEMPTIBLE_WORKERS)
             self.assertEqual(dataproc_operator.labels, self.labels[suffix])
+            self.assertEqual(dataproc_operator.service_account_scopes, SERVICE_ACCOUNT_SCOPES)
 
     def test_build_cluster_data(self):
         for suffix, dataproc_operator in enumerate(self.dataproc_operators):
@@ -102,6 +108,8 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
             self.assertEqual(cluster_data['config']['workerConfig']['numInstances'], NUM_WORKERS)
             self.assertEqual(cluster_data['config']['secondaryWorkerConfig']['numInstances'],
                              NUM_PREEMPTIBLE_WORKERS)
+            self.assertEqual(cluster_data['config']['gceClusterConfig']['serviceAccountScopes'],
+                SERVICE_ACCOUNT_SCOPES)
             # test whether the default airflow-version label has been properly
             # set to the dataproc operator.
             merged_labels = {}


Mime
View raw message