airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1504] Log dataproc cluster name
Date Tue, 15 Aug 2017 17:22:12 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 42cad6069 -> 1cd6c4b0e


[AIRFLOW-1504] Log dataproc cluster name

Closes #2517 from
TrevorEdwards/dataproc_log_clustername


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1cd6c4b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1cd6c4b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1cd6c4b0

Branch: refs/heads/master
Commit: 1cd6c4b0e8d73426be49106eaab75ace1bf4a4bf
Parents: 42cad60
Author: Trevor Edwards <trevoredwards@google.com>
Authored: Tue Aug 15 10:22:03 2017 -0700
Committer: Chris Riccomini <criccomini@apache.org>
Committed: Tue Aug 15 10:22:03 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/operators/dataproc_operator.py  |   2 +
 .../contrib/operators/test_dataproc_operator.py | 109 +++++++++++++++++++
 2 files changed, 111 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1cd6c4b0/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 4b6b61c..aa9b335 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -250,6 +250,7 @@ class DataprocClusterCreateOperator(BaseOperator):
         return cluster_data
 
     def execute(self, context):
+        logging.info('Creating cluster: {}'.format(self.cluster_name))
         hook = DataProcHook(
             gcp_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to
@@ -341,6 +342,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
             time.sleep(15)
 
     def execute(self, context):
+        logging.info('Deleting cluster: {}'.format(self.cluster_name))
         hook = DataProcHook(
             gcp_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1cd6c4b0/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index 923fecc..a0c6ba0 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -13,14 +13,20 @@
 # limitations under the License.
 #
 
+import datetime
 import re
 import unittest
 
+from airflow import DAG
 from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator
+from airflow.contrib.operators.dataproc_operator import DataprocClusterDeleteOperator
 from airflow.version import version
 
 from copy import deepcopy
 
+from mock import Mock
+from mock import patch
+
 TASK_ID = 'test-dataproc-operator'
 CLUSTER_NAME = 'test-cluster-name'
 PROJECT_ID = 'test-project-id'
@@ -35,6 +41,7 @@ WORKER_DISK_SIZE = 100
 NUM_PREEMPTIBLE_WORKERS = 2
 LABEL1 = {}
 LABEL2 = {'application':'test', 'year': 2017}
+DEFAULT_DATE = datetime.datetime(2017, 6, 6)
 
 class DataprocClusterCreateOperatorTest(unittest.TestCase):
     # Unitest for the DataprocClusterCreateOperator
@@ -60,6 +67,14 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                     labels = deepcopy(labels)
                 )
              )
+        self.dag = DAG(
+            'test_dag',
+            default_args={
+                'owner': 'airflow',
+                'start_date': DEFAULT_DATE,
+                'end_date': DEFAULT_DATE,
+            },
+            schedule_interval='@daily')
 
     def test_init(self):
         """Test DataProcClusterOperator instance is properly initialized."""
@@ -95,3 +110,97 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
             self.assertTrue(re.match(r'[a-z]([-a-z0-9]*[a-z0-9])?',
                                      cluster_data['labels']['airflow-version']))
             self.assertEqual(cluster_data['labels'], merged_labels)
+
+    def test_cluster_name_log_no_sub(self):
+        with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') \
+            as mock_hook, patch('logging.info') as l:
+            dataproc_task = DataprocClusterCreateOperator(
+                task_id=TASK_ID,
+                cluster_name=CLUSTER_NAME,
+                project_id=PROJECT_ID,
+                num_workers=NUM_WORKERS,
+                zone=ZONE,
+                dag=self.dag
+            )
+
+            with self.assertRaises(TypeError) as _:
+                dataproc_task.execute(None)
+            l.assert_called_with(('Creating cluster: ' + CLUSTER_NAME))
+
+    def test_cluster_name_log_sub(self):
+        with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') \
+            as mock_hook, patch('logging.info') as l:
+            dataproc_task = DataprocClusterCreateOperator(
+                task_id=TASK_ID,
+                cluster_name='smoke-cluster-{{ ts_nodash }}',
+                project_id=PROJECT_ID,
+                num_workers=NUM_WORKERS,
+                zone=ZONE,
+                dag=self.dag
+            )
+
+            context = { 'ts_nodash' : 'testnodash'}
+
+            rendered = dataproc_task.render_template('cluster_name', getattr(dataproc_task,'cluster_name'),
context)
+            setattr(dataproc_task, 'cluster_name', rendered)
+            with self.assertRaises(TypeError) as _:
+                dataproc_task.execute(None)
+            l.assert_called_with(('Creating cluster: smoke-cluster-testnodash'))
+
+class DataprocClusterDeleteOperatorTest(unittest.TestCase):
+    # Unitest for the DataprocClusterDeleteOperator
+    def setUp(self):
+        self.mock_execute = Mock()
+        self.mock_execute.execute = Mock(return_value={'done' : True})
+        self.mock_get = Mock()
+        self.mock_get.get = Mock(return_value=self.mock_execute)
+        self.mock_operations = Mock()
+        self.mock_operations.get = Mock(return_value=self.mock_get)
+        self.mock_regions = Mock()
+        self.mock_regions.operations = Mock(return_value=self.mock_operations)
+        self.mock_projects=Mock()
+        self.mock_projects.regions = Mock(return_value=self.mock_regions)
+        self.mock_conn = Mock()
+        self.mock_conn.projects = Mock(return_value=self.mock_projects)
+        self.dag = DAG(
+            'test_dag',
+            default_args={
+                'owner': 'airflow',
+                'start_date': DEFAULT_DATE,
+                'end_date': DEFAULT_DATE,
+            },
+            schedule_interval='@daily')
+
+    def test_cluster_name_log_no_sub(self):
+        with patch('airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook') \
+            as mock_hook, patch('logging.info') as l:
+            mock_hook.return_value.get_conn = self.mock_conn
+            dataproc_task = DataprocClusterDeleteOperator(
+                task_id=TASK_ID,
+                cluster_name=CLUSTER_NAME,
+                project_id=PROJECT_ID,
+                dag=self.dag
+            )
+
+            with self.assertRaises(TypeError) as _:
+                dataproc_task.execute(None)
+            l.assert_called_with(('Deleting cluster: ' + CLUSTER_NAME))
+
+    def test_cluster_name_log_sub(self):
+        with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') \
+            as mock_hook, patch('logging.info') as l:
+            mock_hook.return_value.get_conn = self.mock_conn
+            dataproc_task = DataprocClusterDeleteOperator(
+                task_id=TASK_ID,
+                cluster_name='smoke-cluster-{{ ts_nodash }}',
+                project_id=PROJECT_ID,
+                dag=self.dag
+            )
+
+            context = { 'ts_nodash' : 'testnodash'}
+
+            rendered = dataproc_task.render_template('cluster_name', getattr(dataproc_task,'cluster_name'),
context)
+            setattr(dataproc_task, 'cluster_name', rendered)
+            with self.assertRaises(TypeError) as _:
+                dataproc_task.execute(None)
+            l.assert_called_with(('Deleting cluster: smoke-cluster-testnodash'))


Mime
View raw message