airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-2331] Support init action timeout on dataproc cluster create
Date Thu, 26 Apr 2018 08:00:25 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-10-test 84cfbf6a1 -> cb264e940


[AIRFLOW-2331] Support init action timeout on dataproc cluster create

Closes #3235 from piffall/master

(cherry picked from commit e44688ed090a438a20751e11cc96c72554630f1d)
Signed-off-by: Fokko Driesprong <fokkodriesprong@godatadriven.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cb264e94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cb264e94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cb264e94

Branch: refs/heads/v1-10-test
Commit: cb264e940e90a865726ea788e5fd3c2f98f4817e
Parents: 84cfbf6
Author: Cristòfol Torrens <tofol.torrens@bluekiri.com>
Authored: Thu Apr 26 09:59:51 2018 +0200
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Thu Apr 26 10:00:17 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/operators/dataproc_operator.py  | 29 ++++++++++++++++++--
 .../contrib/operators/test_dataproc_operator.py | 12 ++++++--
 2 files changed, 36 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb264e94/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 728fae9..56ebb91 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,8 +20,10 @@
 
 import ntpath
 import os
+import re
 import time
 import uuid
+from datetime import timedelta
 
 from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
@@ -57,6 +59,9 @@ class DataprocClusterCreateOperator(BaseOperator):
     :param init_actions_uris: List of GCS uri's containing
         dataproc initialization scripts
     :type init_actions_uris: list[string]
+    :param init_action_timeout: Amount of time executable scripts in
+        init_actions_uris has to complete
+    :type init_action_timeout: string
     :param metadata: dict of key-value google compute engine metadata entries
         to add to all instances
     :type metadata: dict
@@ -115,6 +120,7 @@ class DataprocClusterCreateOperator(BaseOperator):
                  tags=None,
                  storage_bucket=None,
                  init_actions_uris=None,
+                 init_action_timeout="10m",
                  metadata=None,
                  image_version=None,
                  properties=None,
@@ -141,6 +147,7 @@ class DataprocClusterCreateOperator(BaseOperator):
         self.num_preemptible_workers = num_preemptible_workers
         self.storage_bucket = storage_bucket
         self.init_actions_uris = init_actions_uris
+        self.init_action_timeout = init_action_timeout
         self.metadata = metadata
         self.image_version = image_version
         self.properties = properties
@@ -206,6 +213,19 @@ class DataprocClusterCreateOperator(BaseOperator):
                     return
                 time.sleep(15)
 
+    def _get_init_action_timeout(self):
+        match = re.match(r"^(\d+)(s|m)$", self.init_action_timeout)
+        if match:
+            if match.group(2) == "s":
+                return self.init_action_timeout
+            elif match.group(2) == "m":
+                val = float(match.group(1))
+                return "{}s".format(timedelta(minutes=val).seconds)
+
+        raise AirflowException(
+            "DataprocClusterCreateOperator init_action_timeout"
+            " should be expressed in minutes or seconds. i.e. 10m, 30s")
+
     def _build_cluster_data(self):
         zone_uri = \
             'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
@@ -276,7 +296,10 @@ class DataprocClusterCreateOperator(BaseOperator):
             cluster_data['config']['softwareConfig']['properties'] = self.properties
         if self.init_actions_uris:
             init_actions_dict = [
-                {'executableFile': uri} for uri in self.init_actions_uris
+                {
+                    'executableFile': uri,
+                    'executionTimeout': self._get_init_action_timeout()
+                } for uri in self.init_actions_uris
             ]
             cluster_data['config']['initializationActions'] = init_actions_dict
         if self.service_account:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb264e94/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index 5863b46..e8cd1e5 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -62,6 +62,7 @@ MASTER_DISK_SIZE = 100
 WORKER_MACHINE_TYPE = 'n1-standard-2'
 WORKER_DISK_SIZE = 100
 NUM_PREEMPTIBLE_WORKERS = 2
+GET_INIT_ACTION_TIMEOUT = "600s"  # 10m
 LABEL1 = {}
 LABEL2 = {'application': 'test', 'year': 2017}
 SERVICE_ACCOUNT_SCOPES = [
@@ -130,9 +131,16 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
             self.assertEqual(dataproc_operator.master_disk_size, MASTER_DISK_SIZE)
             self.assertEqual(dataproc_operator.worker_machine_type, WORKER_MACHINE_TYPE)
             self.assertEqual(dataproc_operator.worker_disk_size, WORKER_DISK_SIZE)
-            self.assertEqual(dataproc_operator.num_preemptible_workers, NUM_PREEMPTIBLE_WORKERS)
+            self.assertEqual(dataproc_operator.num_preemptible_workers,
+                             NUM_PREEMPTIBLE_WORKERS)
             self.assertEqual(dataproc_operator.labels, self.labels[suffix])
-            self.assertEqual(dataproc_operator.service_account_scopes, SERVICE_ACCOUNT_SCOPES)
+            self.assertEqual(dataproc_operator.service_account_scopes,
+                             SERVICE_ACCOUNT_SCOPES)
+
+    def test_get_init_action_timeout(self):
+        for suffix, dataproc_operator in enumerate(self.dataproc_operators):
+            timeout = dataproc_operator._get_init_action_timeout()
+            self.assertEqual(timeout, "600s")
 
     def test_build_cluster_data(self):
         for suffix, dataproc_operator in enumerate(self.dataproc_operators):


Mime
View raw message