airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-2313] Add TTL parameters for Dataproc
Date Thu, 03 May 2018 21:10:58 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master c5fa8cd41 -> b02820a7a


[AIRFLOW-2313] Add TTL parameters for Dataproc

Three additional optional parameters to
DataprocClusterCreateOperator
which configure Cloud Dataproc Cluster Scheduled
Deletion features.

Closes #3217 from ebartkus/dataproc-ttl


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b02820a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b02820a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b02820a7

Branch: refs/heads/master
Commit: b02820a7afcb5205df39c4a639f1ceeb2c9c75ee
Parents: c5fa8cd
Author: ebartkus <eba@trustpilot.com>
Authored: Thu May 3 23:10:49 2018 +0200
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Thu May 3 23:10:49 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataproc_hook.py      |  2 +-
 airflow/contrib/operators/dataproc_operator.py  | 30 ++++++++-
 .../contrib/operators/test_dataproc_operator.py | 67 ++++++++++++++++++--
 3 files changed, 93 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b02820a7/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index 7d95897..7849e17 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -197,7 +197,7 @@ class DataProcHook(GoogleCloudBaseHook):
     def __init__(self,
                  gcp_conn_id='google_cloud_default',
                  delegate_to=None,
-                 api_version='v1'):
+                 api_version='v1beta2'):
         super(DataProcHook, self).__init__(gcp_conn_id, delegate_to)
         self.api_version = api_version
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b02820a7/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index ad0aa09..1614720 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -32,6 +32,7 @@ from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.version import version
 from googleapiclient.errors import HttpError
+from airflow.utils import timezone
 
 
 class DataprocClusterCreateOperator(BaseOperator):
@@ -105,6 +106,16 @@ class DataprocClusterCreateOperator(BaseOperator):
     :type service_account: string
     :param service_account_scopes: The URIs of service account scopes to be included.
     :type service_account_scopes: list[string]
+    :param idle_delete_ttl: The longest duration that cluster would keep alive while
+        staying idle. Passing this threshold will cause cluster to be auto-deleted.
+        A duration in seconds.
+    :type idle_delete_ttl: int
+    :param auto_delete_time:  The time when cluster will be auto-deleted.
+    :type auto_delete_time: datetime
+    :param auto_delete_ttl: The life duration of cluster, the cluster will be
+        auto-deleted at the end of this duration.
+        A duration in seconds. (If auto_delete_time is set this parameter will be ignored)
+    :type auto_delete_ttl: int
     """
 
     template_fields = ['cluster_name', 'project_id', 'zone', 'region']
@@ -135,6 +146,9 @@ class DataprocClusterCreateOperator(BaseOperator):
                  delegate_to=None,
                  service_account=None,
                  service_account_scopes=None,
+                 idle_delete_ttl=None,
+                 auto_delete_time=None,
+                 auto_delete_ttl=None,
                  *args,
                  **kwargs):
 
@@ -163,6 +177,9 @@ class DataprocClusterCreateOperator(BaseOperator):
         self.region = region
         self.service_account = service_account
         self.service_account_scopes = service_account_scopes
+        self.idle_delete_ttl = idle_delete_ttl
+        self.auto_delete_time = auto_delete_time
+        self.auto_delete_ttl = auto_delete_ttl
 
     def _get_cluster_list_for_project(self, service):
         result = service.projects().regions().clusters().list(
@@ -261,7 +278,8 @@ class DataprocClusterCreateOperator(BaseOperator):
                     }
                 },
                 'secondaryWorkerConfig': {},
-                'softwareConfig': {}
+                'softwareConfig': {},
+                'lifecycleConfig': {}
             }
         }
         if self.num_preemptible_workers > 0:
@@ -294,6 +312,16 @@ class DataprocClusterCreateOperator(BaseOperator):
             cluster_data['config']['softwareConfig']['imageVersion'] = self.image_version
         if self.properties:
             cluster_data['config']['softwareConfig']['properties'] = self.properties
+        if self.idle_delete_ttl:
+            cluster_data['config']['lifecycleConfig']['idleDeleteTtl'] = \
+                "{}s".format(self.idle_delete_ttl)
+        if self.auto_delete_time:
+            utc_auto_delete_time = timezone.convert_to_utc(self.auto_delete_time)
+            cluster_data['config']['lifecycleConfig']['autoDeleteTime'] = \
+                utc_auto_delete_time.format('%Y-%m-%dT%H:%M:%S.%fZ', formatter='classic')
+        elif self.auto_delete_ttl:
+            cluster_data['config']['lifecycleConfig']['autoDeleteTtl'] = \
+                "{}s".format(self.auto_delete_ttl)
         if self.init_actions_uris:
             init_actions_dict = [
                 {

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b02820a7/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index e8cd1e5..d039cf1 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -69,6 +69,9 @@ SERVICE_ACCOUNT_SCOPES = [
     'https://www.googleapis.com/auth/bigquery',
     'https://www.googleapis.com/auth/bigtable.data'
 ]
+IDLE_DELETE_TTL = 321
+AUTO_DELETE_TIME = datetime.datetime(2017, 6, 7)
+AUTO_DELETE_TTL = 654
 DEFAULT_DATE = datetime.datetime(2017, 6, 6)
 REGION = 'test-region'
 MAIN_URI = 'test-uri'
@@ -102,8 +105,11 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                     worker_machine_type=WORKER_MACHINE_TYPE,
                     worker_disk_size=WORKER_DISK_SIZE,
                     num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS,
-                    labels = deepcopy(labels),
-                    service_account_scopes = SERVICE_ACCOUNT_SCOPES
+                    labels=deepcopy(labels),
+                    service_account_scopes=SERVICE_ACCOUNT_SCOPES,
+                    idle_delete_ttl=IDLE_DELETE_TTL,
+                    auto_delete_time=AUTO_DELETE_TIME,
+                    auto_delete_ttl=AUTO_DELETE_TTL
                 )
              )
         self.dag = DAG(
@@ -136,6 +142,9 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
             self.assertEqual(dataproc_operator.labels, self.labels[suffix])
             self.assertEqual(dataproc_operator.service_account_scopes,
                              SERVICE_ACCOUNT_SCOPES)
+            self.assertEqual(dataproc_operator.idle_delete_ttl, IDLE_DELETE_TTL)
+            self.assertEqual(dataproc_operator.auto_delete_time, AUTO_DELETE_TIME)
+            self.assertEqual(dataproc_operator.auto_delete_ttl, AUTO_DELETE_TTL)
 
     def test_get_init_action_timeout(self):
         for suffix, dataproc_operator in enumerate(self.dataproc_operators):
@@ -160,6 +169,10 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                 NETWORK_URI)
             self.assertEqual(cluster_data['config']['gceClusterConfig']['tags'],
                 TAGS)
+            self.assertEqual(cluster_data['config']['lifecycleConfig']['idleDeleteTtl'],
+                             "321s")
+            self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTime'],
+                             "2017-06-07T00:00:00.000000Z")
             # test whether the default airflow-version label has been properly
             # set to the dataproc operator.
             merged_labels = {}
@@ -169,6 +182,52 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                                      cluster_data['labels']['airflow-version']))
             self.assertEqual(cluster_data['labels'], merged_labels)
 
+    def test_build_cluster_data_with_autoDeleteTime(self):
+        dataproc_operator = DataprocClusterCreateOperator(
+            task_id=TASK_ID,
+            cluster_name=CLUSTER_NAME,
+            project_id=PROJECT_ID,
+            num_workers=NUM_WORKERS,
+            zone=ZONE,
+            dag=self.dag,
+            auto_delete_time=AUTO_DELETE_TIME,
+        )
+        cluster_data = dataproc_operator._build_cluster_data()
+        self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTime'],
+                         "2017-06-07T00:00:00.000000Z")
+
+    def test_build_cluster_data_with_autoDeleteTtl(self):
+        dataproc_operator = DataprocClusterCreateOperator(
+            task_id=TASK_ID,
+            cluster_name=CLUSTER_NAME,
+            project_id=PROJECT_ID,
+            num_workers=NUM_WORKERS,
+            zone=ZONE,
+            dag=self.dag,
+            auto_delete_ttl=AUTO_DELETE_TTL,
+        )
+        cluster_data = dataproc_operator._build_cluster_data()
+        self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTtl'],
+                         "654s")
+
+    def test_build_cluster_data_with_autoDeleteTime_and_autoDeleteTtl(self):
+        dataproc_operator = DataprocClusterCreateOperator(
+            task_id=TASK_ID,
+            cluster_name=CLUSTER_NAME,
+            project_id=PROJECT_ID,
+            num_workers=NUM_WORKERS,
+            zone=ZONE,
+            dag=self.dag,
+            auto_delete_time=AUTO_DELETE_TIME,
+            auto_delete_ttl=AUTO_DELETE_TTL,
+        )
+        cluster_data = dataproc_operator._build_cluster_data()
+        if 'autoDeleteTtl' in cluster_data['config']['lifecycleConfig']:
+            self.fail("If 'auto_delete_time' and 'auto_delete_ttl' is set, " +
+                      "only `auto_delete_time` is used")
+        self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTime'],
+                         "2017-06-07T00:00:00.000000Z")
+
     def test_cluster_name_log_no_sub(self):
         with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
             mock_hook.return_value.get_conn = self.mock_conn


Mime
View raw message