airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kaxiln...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-2753] Add dataproc_job_id instance var holding actual DP jobId
Date Sun, 22 Jul 2018 20:54:28 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master c1217c349 -> 03ac60dd6


[AIRFLOW-2753] Add dataproc_job_id instance var holding actual DP jobId

Closes #3622 from jeffkpayne/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/03ac60dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/03ac60dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/03ac60dd

Branch: refs/heads/master
Commit: 03ac60dd67388951f033f4e64bbe55d62cd6257f
Parents: c1217c3
Author: Jeffrey Scott Keone Payne <jpayne@bombora.com>
Authored: Sun Jul 22 21:54:21 2018 +0100
Committer: Kaxil Naik <kaxilnaik@apache.org>
Committed: Sun Jul 22 21:54:21 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/operators/dataproc_operator.py  | 61 +++++++++++++--
 .../contrib/operators/test_dataproc_operator.py | 79 +++++++++++++++++---
 2 files changed, 125 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/03ac60dd/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 5d59f7f..23cfeb0 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -669,6 +669,11 @@ class DataProcPigOperator(BaseOperator):
     :type delegate_to: string
     :param region: The specified region where the dataproc cluster is created.
     :type region: string
+    :var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API.
+        This is useful for identifying or linking to the job in the Google Cloud Console
+        Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with
+        an 8 character random string.
+    :vartype dataproc_job_id: string
     """
     template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars']
     template_ext = ('.pg', '.pig',)
@@ -716,7 +721,10 @@ class DataProcPigOperator(BaseOperator):
         job.add_jar_file_uris(self.dataproc_jars)
         job.set_job_name(self.job_name)
 
-        hook.submit(hook.project_id, job.build(), self.region)
+        job_to_submit = job.build()
+        self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"]
+
+        hook.submit(hook.project_id, job_to_submit, self.region)
 
 
 class DataProcHiveOperator(BaseOperator):
@@ -749,6 +757,11 @@ class DataProcHiveOperator(BaseOperator):
     :type delegate_to: string
     :param region: The specified region where the dataproc cluster is created.
     :type region: string
+    :var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API.
+        This is useful for identifying or linking to the job in the Google Cloud Console
+        Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with
+        an 8 character random string.
+    :vartype dataproc_job_id: string
     """
     template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars']
     template_ext = ('.q',)
@@ -797,7 +810,10 @@ class DataProcHiveOperator(BaseOperator):
         job.add_jar_file_uris(self.dataproc_jars)
         job.set_job_name(self.job_name)
 
-        hook.submit(hook.project_id, job.build(), self.region)
+        job_to_submit = job.build()
+        self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"]
+
+        hook.submit(hook.project_id, job_to_submit, self.region)
 
 
 class DataProcSparkSqlOperator(BaseOperator):
@@ -831,6 +847,11 @@ class DataProcSparkSqlOperator(BaseOperator):
     :type delegate_to: string
     :param region: The specified region where the dataproc cluster is created.
     :type region: string
+    :var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API.
+        This is useful for identifying or linking to the job in the Google Cloud Console
+        Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with
+        an 8 character random string.
+    :vartype dataproc_job_id: string
     """
     template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars']
     template_ext = ('.q',)
@@ -879,7 +900,10 @@ class DataProcSparkSqlOperator(BaseOperator):
         job.add_jar_file_uris(self.dataproc_jars)
         job.set_job_name(self.job_name)
 
-        hook.submit(hook.project_id, job.build(), self.region)
+        job_to_submit = job.build()
+        self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"]
+
+        hook.submit(hook.project_id, job_to_submit, self.region)
 
 
 class DataProcSparkOperator(BaseOperator):
@@ -920,6 +944,11 @@ class DataProcSparkOperator(BaseOperator):
     :type delegate_to: string
     :param region: The specified region where the dataproc cluster is created.
     :type region: string
+    :var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API.
+        This is useful for identifying or linking to the job in the Google Cloud Console
+        Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with
+        an 8 character random string.
+    :vartype dataproc_job_id: string
     """
 
     template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars']
@@ -970,7 +999,10 @@ class DataProcSparkOperator(BaseOperator):
         job.add_file_uris(self.files)
         job.set_job_name(self.job_name)
 
-        hook.submit(hook.project_id, job.build(), self.region)
+        job_to_submit = job.build()
+        self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"]
+
+        hook.submit(hook.project_id, job_to_submit, self.region)
 
 
 class DataProcHadoopOperator(BaseOperator):
@@ -1011,6 +1043,11 @@ class DataProcHadoopOperator(BaseOperator):
     :type delegate_to: string
     :param region: The specified region where the dataproc cluster is created.
     :type region: string
+    :var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API.
+        This is useful for identifying or linking to the job in the Google Cloud Console
+        Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with
+        an 8 character random string.
+    :vartype dataproc_job_id: string
     """
 
     template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars']
@@ -1061,10 +1098,14 @@ class DataProcHadoopOperator(BaseOperator):
         job.add_file_uris(self.files)
         job.set_job_name(self.job_name)
 
-        hook.submit(hook.project_id, job.build(), self.region)
+        job_to_submit = job.build()
+        self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"]
+
+        hook.submit(hook.project_id, job_to_submit, self.region)
 
 
 class DataProcPySparkOperator(BaseOperator):
+    # TODO Add docs around dataproc_job_id.
     """
     Start a PySpark Job on a Cloud DataProc cluster.
 
@@ -1102,6 +1143,11 @@ class DataProcPySparkOperator(BaseOperator):
     :type delegate_to: string
     :param region: The specified region where the dataproc cluster is created.
     :type region: string
+    :var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API.
+        This is useful for identifying or linking to the job in the Google Cloud Console
+        Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with
+        an 8 character random string.
+    :vartype dataproc_job_id: string
     """
 
     template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars']
@@ -1192,7 +1238,10 @@ class DataProcPySparkOperator(BaseOperator):
         job.add_python_file_uris(self.pyfiles)
         job.set_job_name(self.job_name)
 
-        hook.submit(hook.project_id, job.build(), self.region)
+        job_to_submit = job.build()
+        self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"]
+
+        hook.submit(hook.project_id, job_to_submit, self.region)
 
 
 class DataprocWorkflowTemplateBaseOperator(BaseOperator):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/03ac60dd/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index 65ff5cd..f584ba7 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -45,7 +45,7 @@ except ImportError:
     except ImportError:
         mock = None
 
-from mock import Mock
+from mock import MagicMock, Mock
 from mock import patch
 
 TASK_ID = 'test-dataproc-operator'
@@ -80,6 +80,27 @@ MAIN_URI = 'test-uri'
 TEMPLATE_ID = 'template-id'
 
 HOOK = 'airflow.contrib.operators.dataproc_operator.DataProcHook'
+DATAPROC_JOB_ID = 'dataproc_job_id'
+DATAPROC_JOB_TO_SUBMIT = {
+    'job': {
+        'reference': {
+            'projectId': PROJECT_ID,
+            'jobId': DATAPROC_JOB_ID,
+        },
+        'placement': {
+            'clusterName': CLUSTER_NAME
+        }
+    }
+}
+
+
+def _assert_dataproc_job_id(mock_hook, dataproc_task):
+    hook = mock_hook.return_value
+    job = MagicMock()
+    job.build.return_value = DATAPROC_JOB_TO_SUBMIT
+    hook.create_job_template.return_value = job
+    dataproc_task.execute(None)
+    assert dataproc_task.dataproc_job_id == DATAPROC_JOB_ID
 
 
 class DataprocClusterCreateOperatorTest(unittest.TestCase):
@@ -434,31 +455,51 @@ class DataprocClusterDeleteOperatorTest(unittest.TestCase):
 class DataProcHadoopOperatorTest(unittest.TestCase):
     # Unit test for the DataProcHadoopOperator
     def test_hook_correct_region(self):
-       with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
+        with patch(HOOK) as mock_hook:
             dataproc_task = DataProcHadoopOperator(
                 task_id=TASK_ID,
                 region=REGION
             )
 
             dataproc_task.execute(None)
-            mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION)
+            mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
+                                                                  REGION)
+
+    def test_dataproc_job_id_is_set(self):
+        with patch(HOOK) as mock_hook:
+            dataproc_task = DataProcHadoopOperator(
+                task_id=TASK_ID
+            )
+
+            _assert_dataproc_job_id(mock_hook, dataproc_task)
+
 
 class DataProcHiveOperatorTest(unittest.TestCase):
     # Unit test for the DataProcHiveOperator
     def test_hook_correct_region(self):
-       with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
+        with patch(HOOK) as mock_hook:
             dataproc_task = DataProcHiveOperator(
                 task_id=TASK_ID,
                 region=REGION
             )
 
             dataproc_task.execute(None)
-            mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION)
+            mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
+                                                                  REGION)
+
+    def test_dataproc_job_id_is_set(self):
+        with patch(HOOK) as mock_hook:
+            dataproc_task = DataProcHiveOperator(
+                task_id=TASK_ID
+            )
+
+            _assert_dataproc_job_id(mock_hook, dataproc_task)
+
 
 class DataProcPySparkOperatorTest(unittest.TestCase):
     # Unit test for the DataProcPySparkOperator
     def test_hook_correct_region(self):
-       with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
+        with patch(HOOK) as mock_hook:
             dataproc_task = DataProcPySparkOperator(
                 task_id=TASK_ID,
                 main=MAIN_URI,
@@ -466,19 +507,39 @@ class DataProcPySparkOperatorTest(unittest.TestCase):
             )
 
             dataproc_task.execute(None)
-            mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION)
+            mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
+                                                                  REGION)
+
+    def test_dataproc_job_id_is_set(self):
+        with patch(HOOK) as mock_hook:
+            dataproc_task = DataProcPySparkOperator(
+                task_id=TASK_ID,
+                main=MAIN_URI
+            )
+
+            _assert_dataproc_job_id(mock_hook, dataproc_task)
+
 
 class DataProcSparkOperatorTest(unittest.TestCase):
     # Unit test for the DataProcSparkOperator
     def test_hook_correct_region(self):
-       with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
+        with patch(HOOK) as mock_hook:
             dataproc_task = DataProcSparkOperator(
                 task_id=TASK_ID,
                 region=REGION
             )
 
             dataproc_task.execute(None)
-            mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION)
+            mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
+                                                                  REGION)
+
+    def test_dataproc_job_id_is_set(self):
+        with patch(HOOK) as mock_hook:
+            dataproc_task = DataProcSparkOperator(
+                task_id=TASK_ID
+            )
+
+            _assert_dataproc_job_id(mock_hook, dataproc_task)
 
 
 class DataprocWorkflowTemplateInstantiateOperatorTest(unittest.TestCase):


Mime
View raw message