airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-2789) Add ability to create single node cluster to DataprocClusterCreateOperator
Date Fri, 12 Oct 2018 14:48:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-2789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647988#comment-16647988
] 

ASF GitHub Bot commented on AIRFLOW-2789:
-----------------------------------------

Fokko closed pull request #4015: [AIRFLOW-2789] Create single node DataProc cluster
URL: https://github.com/apache/incubator-airflow/pull/4015
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 49e24a3df2..af2a211539 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -52,7 +52,8 @@ class DataprocClusterCreateOperator(BaseOperator):
     :param project_id: The ID of the google cloud project in which
         to create the cluster. (templated)
     :type project_id: str
-    :param num_workers: The # of workers to spin up
+    :param num_workers: The # of workers to spin up. If set to zero will
+        spin up cluster in a single node mode
     :type num_workers: int
     :param storage_bucket: The storage bucket to use, setting to None lets dataproc
         generate a custom one for you
@@ -186,7 +187,7 @@ def __init__(self,
         self.metadata = metadata
         self.custom_image = custom_image
         self.image_version = image_version
-        self.properties = properties
+        self.properties = properties or dict()
         self.master_machine_type = master_machine_type
         self.master_disk_type = master_disk_type
         self.master_disk_size = master_disk_size
@@ -205,10 +206,18 @@ def __init__(self,
         self.idle_delete_ttl = idle_delete_ttl
         self.auto_delete_time = auto_delete_time
         self.auto_delete_ttl = auto_delete_ttl
+        self.single_node = num_workers == 0
 
         assert not (self.custom_image and self.image_version), \
             "custom_image and image_version can't be both set"
 
+        assert (
+            not self.single_node or (
+                self.single_node and self.num_preemptible_workers == 0
+            )
+        ), "num_workers == 0 means single node mode - no preemptibles allowed"
+
+
     def _get_cluster_list_for_project(self, service):
         result = service.projects().regions().clusters().list(
             projectId=self.project_id,
@@ -351,7 +360,12 @@ def _build_cluster_data(self):
                                '{}/global/images/{}'.format(self.project_id,
                                                             self.custom_image)
             cluster_data['config']['masterConfig']['imageUri'] = custom_image_url
-            cluster_data['config']['workerConfig']['imageUri'] = custom_image_url
+            if not self.single_node:
+                cluster_data['config']['workerConfig']['imageUri'] = custom_image_url
+
+        if self.single_node:
+            self.properties["dataproc:dataproc.allow.zero.workers"] = "true"
+
         if self.properties:
             cluster_data['config']['softwareConfig']['properties'] = self.properties
         if self.idle_delete_ttl:
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index 60c1268ee7..fb90606ea5 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -298,6 +298,34 @@ def test_init_with_custom_image(self):
         self.assertEqual(cluster_data['config']['workerConfig']['imageUri'],
                          expected_custom_image_url)
 
+    def test_build_single_node_cluster(self):
+        dataproc_operator = DataprocClusterCreateOperator(
+            task_id=TASK_ID,
+            cluster_name=CLUSTER_NAME,
+            project_id=PROJECT_ID,
+            num_workers=0,
+            num_preemptible_workers=0,
+            zone=ZONE,
+            dag=self.dag
+        )
+        cluster_data = dataproc_operator._build_cluster_data()
+        self.assertEqual(
+            cluster_data['config']['softwareConfig']['properties']
+            ['dataproc:dataproc.allow.zero.workers'], "true")
+
+    def test_init_cluster_with_zero_workers_and_not_non_zero_preemtibles(self):
+        with self.assertRaises(AssertionError):
+            DataprocClusterCreateOperator(
+                task_id=TASK_ID,
+                cluster_name=CLUSTER_NAME,
+                project_id=PROJECT_ID,
+                num_workers=0,
+                num_preemptible_workers=2,
+                zone=ZONE,
+                dag=self.dag,
+                image_version=IMAGE_VERSION,
+            )
+
     def test_cluster_name_log_no_sub(self):
         with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') \
                 as mock_hook:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add ability to create single node cluster to DataprocClusterCreateOperator
> --------------------------------------------------------------------------
>
>                 Key: AIRFLOW-2789
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2789
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: contrib, gcp, operators
>    Affects Versions: 1.10.0, 2.0.0
>            Reporter: Jarosław Śmietanka
>            Assignee: Jarosław Śmietanka
>            Priority: Minor
>             Fix For: 1.10.1
>
>
>  In GCP, it is possible to set up [Single node clusters|https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/single-node-clusters]
> Since the minimal size of the cluster (without this modification) is 3 (one master +
two workers). It may be very helpful while doing, for example, small-scale non-critical data
processing or building proof-of-concept.
> Since I already have a code which does that, I volunteer to bring it to the community
:)  
> This improvement won't change many components and should not require groundbreaking
changes to DataprocClusterCreateOperator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message