airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1299] Support imageVersion in Google Dataproc cluster
Date Mon, 12 Jun 2017 23:59:27 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6c55a2219 -> c2b80e928


[AIRFLOW-1299] Support imageVersion in Google Dataproc cluster

Closes #2358 from yu-iskw/dataproc-image-version


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c2b80e92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c2b80e92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c2b80e92

Branch: refs/heads/master
Commit: c2b80e928e707df4707ea52c7329b0828ae37dee
Parents: 6c55a22
Author: Yu ISHIKAWA <yuu.ishikawa@gmail.com>
Authored: Mon Jun 12 16:58:50 2017 -0700
Committer: Chris Riccomini <criccomini@apache.org>
Committed: Mon Jun 12 16:59:01 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/operators/dataproc_operator.py  | 37 ++++++----
 .../contrib/operators/test_dataproc_operator.py | 74 ++++++++++++++++++++
 2 files changed, 97 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c2b80e92/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 24fa2e4..14dddb0 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -46,6 +46,7 @@ class DataprocClusterCreateOperator(BaseOperator):
                  storage_bucket=None,
                  init_actions_uris=None,
                  metadata=None,
+                 image_version=None,
                  properties=None,
                  master_machine_type='n1-standard-4',
                  master_disk_size=500,
@@ -81,6 +82,8 @@ class DataprocClusterCreateOperator(BaseOperator):
         :param metadata: dict of key-value google compute engine metadata entries
             to add to all instances
         :type metadata: dict
+        :param image_version: the version of software inside the Dataproc cluster
+        :type image_version: string
         :param properties: dict of properties to set on
             config files (e.g. spark-defaults.conf), see
             https://cloud.google.com/dataproc/docs/reference/rest/v1/ \
@@ -118,6 +121,7 @@ class DataprocClusterCreateOperator(BaseOperator):
         self.storage_bucket = storage_bucket
         self.init_actions_uris = init_actions_uris
         self.metadata = metadata
+        self.image_version = image_version
         self.properties = properties
         self.master_machine_type = master_machine_type
         self.master_disk_size = master_disk_size
@@ -175,20 +179,7 @@ class DataprocClusterCreateOperator(BaseOperator):
                     return
                 time.sleep(15)
 
-    def execute(self, context):
-        hook = DataProcHook(
-            gcp_conn_id=self.google_cloud_conn_id,
-            delegate_to=self.delegate_to
-        )
-        service = hook.get_conn()
-
-        if self._get_cluster(service):
-            logging.info('Cluster {} already exists... Checking status...'.format(
-                            self.cluster_name
-                        ))
-            self._wait_for_done(service)
-            return True
-
+    def _build_cluster_data(self):
         zone_uri = \
             'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
                 self.project_id, self.zone
@@ -241,6 +232,8 @@ class DataprocClusterCreateOperator(BaseOperator):
             cluster_data['config']['configBucket'] = self.storage_bucket
         if self.metadata:
             cluster_data['config']['gceClusterConfig']['metadata'] = self.metadata
+        if self.image_version:
+            cluster_data['config']['softwareConfig']['imageVersion'] = self.image_version
         if self.properties:
             cluster_data['config']['softwareConfig']['properties'] = self.properties
         if self.init_actions_uris:
@@ -248,7 +241,23 @@ class DataprocClusterCreateOperator(BaseOperator):
                 {'executableFile': uri} for uri in self.init_actions_uris
             ]
             cluster_data['config']['initializationActions'] = init_actions_dict
+        return cluster_data
+
+    def execute(self, context):
+        hook = DataProcHook(
+            gcp_conn_id=self.google_cloud_conn_id,
+            delegate_to=self.delegate_to
+        )
+        service = hook.get_conn()
+
+        if self._get_cluster(service):
+            logging.info('Cluster {} already exists... Checking status...'.format(
+                            self.cluster_name
+                        ))
+            self._wait_for_done(service)
+            return True
 
+        cluster_data = self._build_cluster_data()
         try:
             service.projects().regions().clusters().create(
                 projectId=self.project_id,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c2b80e92/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
new file mode 100644
index 0000000..4d5e84b
--- /dev/null
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator
+
+
+TASK_ID = 'test-dataproc-operator'
+CLUSTER_NAME = 'test-cluster-name'
+PROJECT_ID = 'test-project-id'
+NUM_WORKERS = 123
+ZONE = 'us-central1-a'
+STORAGE_BUCKET = 'gs://airflow-test-bucket/'
+IMAGE_VERSION = '1.1'
+MASTER_MACHINE_TYPE = 'n1-standard-2'
+MASTER_DISK_SIZE = 100
+WORKER_MACHINE_TYPE = 'n1-standard-2'
+WORKER_DISK_SIZE = 100
+NUM_PREEMPTIBLE_WORKERS = 2
+
+
+class DataprocClusterCreateOperatorTest(unittest.TestCase):
+
+    def setUp(self):
+        self.dataproc = DataprocClusterCreateOperator(
+            task_id=TASK_ID,
+            cluster_name=CLUSTER_NAME,
+            project_id=PROJECT_ID,
+            num_workers=NUM_WORKERS,
+            zone=ZONE,
+            storage_bucket=STORAGE_BUCKET,
+            image_version=IMAGE_VERSION,
+            master_machine_type=MASTER_MACHINE_TYPE,
+            master_disk_size=MASTER_DISK_SIZE,
+            worker_machine_type=WORKER_MACHINE_TYPE,
+            worker_disk_size=WORKER_DISK_SIZE,
+            num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS)
+
+    def test_init(self):
+        """Test DataFlowPythonOperator instance is properly initialized."""
+        self.assertEqual(self.dataproc.cluster_name, CLUSTER_NAME)
+        self.assertEqual(self.dataproc.project_id, PROJECT_ID)
+        self.assertEqual(self.dataproc.num_workers, NUM_WORKERS)
+        self.assertEqual(self.dataproc.zone, ZONE)
+        self.assertEqual(self.dataproc.storage_bucket, STORAGE_BUCKET)
+        self.assertEqual(self.dataproc.image_version, IMAGE_VERSION)
+        self.assertEqual(self.dataproc.master_machine_type, MASTER_MACHINE_TYPE)
+        self.assertEqual(self.dataproc.master_disk_size, MASTER_DISK_SIZE)
+        self.assertEqual(self.dataproc.worker_machine_type, WORKER_MACHINE_TYPE)
+        self.assertEqual(self.dataproc.worker_disk_size, WORKER_DISK_SIZE)
+        self.assertEqual(self.dataproc.num_preemptible_workers, NUM_PREEMPTIBLE_WORKERS)
+
+    def test_build_cluster_data(self):
+        cluster_data = self.dataproc._build_cluster_data()
+        self.assertEqual(cluster_data['clusterName'], CLUSTER_NAME)
+        self.assertEqual(cluster_data['projectId'], PROJECT_ID)
+        self.assertEqual(cluster_data['config']['softwareConfig'], {'imageVersion': IMAGE_VERSION})
+        self.assertEqual(cluster_data['config']['configBucket'], STORAGE_BUCKET)
+        self.assertEqual(cluster_data['config']['workerConfig']['numInstances'], NUM_WORKERS)
+        self.assertEqual(cluster_data['config']['secondaryWorkerConfig']['numInstances'],
+                         NUM_PREEMPTIBLE_WORKERS)


Mime
View raw message