From commits-return-16678-archive-asf-public=cust-asf.ponee.io@airflow.incubator.apache.org Sun Jul 22 22:54:35 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6EBE418062F for ; Sun, 22 Jul 2018 22:54:34 +0200 (CEST) Received: (qmail 73300 invoked by uid 500); 22 Jul 2018 20:54:33 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 73289 invoked by uid 99); 22 Jul 2018 20:54:33 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 22 Jul 2018 20:54:33 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 0E2E71A3195 for ; Sun, 22 Jul 2018 20:54:33 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -11.701 X-Spam-Level: X-Spam-Status: No, score=-11.701 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id SwgQ92Lob7xI for ; Sun, 22 Jul 2018 20:54:30 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 9E7BB5F27B for ; Sun, 22 Jul 2018 20:54:29 +0000 (UTC) Received: (qmail 73283 invoked by uid 99); 22 Jul 2018 20:54:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 22 Jul 2018 20:54:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B7979DFAB3; Sun, 22 Jul 2018 20:54:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kaxilnaik@apache.org To: commits@airflow.incubator.apache.org Message-Id: <56cee0a038ad465bbbc92f7b7ade9c23@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-airflow git commit: [AIRFLOW-2753] Add dataproc_job_id instance var holding actual DP jobId Date: Sun, 22 Jul 2018 20:54:28 +0000 (UTC) Repository: incubator-airflow Updated Branches: refs/heads/master c1217c349 -> 03ac60dd6 [AIRFLOW-2753] Add dataproc_job_id instance var holding actual DP jobId Closes #3622 from jeffkpayne/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/03ac60dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/03ac60dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/03ac60dd Branch: refs/heads/master Commit: 03ac60dd67388951f033f4e64bbe55d62cd6257f Parents: c1217c3 Author: Jeffrey Scott Keone Payne Authored: Sun Jul 22 21:54:21 2018 +0100 Committer: Kaxil Naik Committed: Sun Jul 22 21:54:21 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/operators/dataproc_operator.py | 61 +++++++++++++-- .../contrib/operators/test_dataproc_operator.py | 79 +++++++++++++++++--- 2 files changed, 125 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/03ac60dd/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 5d59f7f..23cfeb0 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -669,6 +669,11 @@ class DataProcPigOperator(BaseOperator): :type delegate_to: string :param region: The specified region where the dataproc cluster is created. :type region: string + :var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API. + This is useful for identifying or linking to the job in the Google Cloud Console + Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with + an 8 character random string. + :vartype dataproc_job_id: string """ template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars'] template_ext = ('.pg', '.pig',) @@ -716,7 +721,10 @@ class DataProcPigOperator(BaseOperator): job.add_jar_file_uris(self.dataproc_jars) job.set_job_name(self.job_name) - hook.submit(hook.project_id, job.build(), self.region) + job_to_submit = job.build() + self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"] + + hook.submit(hook.project_id, job_to_submit, self.region) class DataProcHiveOperator(BaseOperator): @@ -749,6 +757,11 @@ class DataProcHiveOperator(BaseOperator): :type delegate_to: string :param region: The specified region where the dataproc cluster is created. :type region: string + :var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API. + This is useful for identifying or linking to the job in the Google Cloud Console + Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with + an 8 character random string. + :vartype dataproc_job_id: string """ template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars'] template_ext = ('.q',) @@ -797,7 +810,10 @@ class DataProcHiveOperator(BaseOperator): job.add_jar_file_uris(self.dataproc_jars) job.set_job_name(self.job_name) - hook.submit(hook.project_id, job.build(), self.region) + job_to_submit = job.build() + self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"] + + hook.submit(hook.project_id, job_to_submit, self.region) class DataProcSparkSqlOperator(BaseOperator): @@ -831,6 +847,11 @@ class DataProcSparkSqlOperator(BaseOperator): :type delegate_to: string :param region: The specified region where the dataproc cluster is created. :type region: string + :var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API. + This is useful for identifying or linking to the job in the Google Cloud Console + Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with + an 8 character random string. + :vartype dataproc_job_id: string """ template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars'] template_ext = ('.q',) @@ -879,7 +900,10 @@ class DataProcSparkSqlOperator(BaseOperator): job.add_jar_file_uris(self.dataproc_jars) job.set_job_name(self.job_name) - hook.submit(hook.project_id, job.build(), self.region) + job_to_submit = job.build() + self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"] + + hook.submit(hook.project_id, job_to_submit, self.region) class DataProcSparkOperator(BaseOperator): @@ -920,6 +944,11 @@ class DataProcSparkOperator(BaseOperator): :type delegate_to: string :param region: The specified region where the dataproc cluster is created. :type region: string + :var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API. + This is useful for identifying or linking to the job in the Google Cloud Console + Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with + an 8 character random string. + :vartype dataproc_job_id: string """ template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars'] @@ -970,7 +999,10 @@ class DataProcSparkOperator(BaseOperator): job.add_file_uris(self.files) job.set_job_name(self.job_name) - hook.submit(hook.project_id, job.build(), self.region) + job_to_submit = job.build() + self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"] + + hook.submit(hook.project_id, job_to_submit, self.region) class DataProcHadoopOperator(BaseOperator): @@ -1011,6 +1043,11 @@ class DataProcHadoopOperator(BaseOperator): :type delegate_to: string :param region: The specified region where the dataproc cluster is created. :type region: string + :var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API. + This is useful for identifying or linking to the job in the Google Cloud Console + Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with + an 8 character random string. + :vartype dataproc_job_id: string """ template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars'] @@ -1061,10 +1098,14 @@ class DataProcHadoopOperator(BaseOperator): job.add_file_uris(self.files) job.set_job_name(self.job_name) - hook.submit(hook.project_id, job.build(), self.region) + job_to_submit = job.build() + self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"] + + hook.submit(hook.project_id, job_to_submit, self.region) class DataProcPySparkOperator(BaseOperator): + # TODO Add docs around dataproc_job_id. """ Start a PySpark Job on a Cloud DataProc cluster. @@ -1102,6 +1143,11 @@ class DataProcPySparkOperator(BaseOperator): :type delegate_to: string :param region: The specified region where the dataproc cluster is created. :type region: string + :var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API. + This is useful for identifying or linking to the job in the Google Cloud Console + Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with + an 8 character random string. + :vartype dataproc_job_id: string """ template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars'] @@ -1192,7 +1238,10 @@ class DataProcPySparkOperator(BaseOperator): job.add_python_file_uris(self.pyfiles) job.set_job_name(self.job_name) - hook.submit(hook.project_id, job.build(), self.region) + job_to_submit = job.build() + self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"] + + hook.submit(hook.project_id, job_to_submit, self.region) class DataprocWorkflowTemplateBaseOperator(BaseOperator): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/03ac60dd/tests/contrib/operators/test_dataproc_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py index 65ff5cd..f584ba7 100644 --- a/tests/contrib/operators/test_dataproc_operator.py +++ b/tests/contrib/operators/test_dataproc_operator.py @@ -45,7 +45,7 @@ except ImportError: except ImportError: mock = None -from mock import Mock +from mock import MagicMock, Mock from mock import patch TASK_ID = 'test-dataproc-operator' @@ -80,6 +80,27 @@ MAIN_URI = 'test-uri' TEMPLATE_ID = 'template-id' HOOK = 'airflow.contrib.operators.dataproc_operator.DataProcHook' +DATAPROC_JOB_ID = 'dataproc_job_id' +DATAPROC_JOB_TO_SUBMIT = { + 'job': { + 'reference': { + 'projectId': PROJECT_ID, + 'jobId': DATAPROC_JOB_ID, + }, + 'placement': { + 'clusterName': CLUSTER_NAME + } + } +} + + +def _assert_dataproc_job_id(mock_hook, dataproc_task): + hook = mock_hook.return_value + job = MagicMock() + job.build.return_value = DATAPROC_JOB_TO_SUBMIT + hook.create_job_template.return_value = job + dataproc_task.execute(None) + assert dataproc_task.dataproc_job_id == DATAPROC_JOB_ID class DataprocClusterCreateOperatorTest(unittest.TestCase): @@ -434,31 +455,51 @@ class DataprocClusterDeleteOperatorTest(unittest.TestCase): class DataProcHadoopOperatorTest(unittest.TestCase): # Unit test for the DataProcHadoopOperator def test_hook_correct_region(self): - with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook: + with patch(HOOK) as mock_hook: dataproc_task = DataProcHadoopOperator( task_id=TASK_ID, region=REGION ) dataproc_task.execute(None) - mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION) + mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, + REGION) + + def test_dataproc_job_id_is_set(self): + with patch(HOOK) as mock_hook: + dataproc_task = DataProcHadoopOperator( + task_id=TASK_ID + ) + + _assert_dataproc_job_id(mock_hook, dataproc_task) + class DataProcHiveOperatorTest(unittest.TestCase): # Unit test for the DataProcHiveOperator def test_hook_correct_region(self): - with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook: + with patch(HOOK) as mock_hook: dataproc_task = DataProcHiveOperator( task_id=TASK_ID, region=REGION ) dataproc_task.execute(None) - mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION) + mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, + REGION) + + def test_dataproc_job_id_is_set(self): + with patch(HOOK) as mock_hook: + dataproc_task = DataProcHiveOperator( + task_id=TASK_ID + ) + + _assert_dataproc_job_id(mock_hook, dataproc_task) + class DataProcPySparkOperatorTest(unittest.TestCase): # Unit test for the DataProcPySparkOperator def test_hook_correct_region(self): - with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook: + with patch(HOOK) as mock_hook: dataproc_task = DataProcPySparkOperator( task_id=TASK_ID, main=MAIN_URI, @@ -466,19 +507,39 @@ class DataProcPySparkOperatorTest(unittest.TestCase): ) dataproc_task.execute(None) - mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION) + mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, + REGION) + + def test_dataproc_job_id_is_set(self): + with patch(HOOK) as mock_hook: + dataproc_task = DataProcPySparkOperator( + task_id=TASK_ID, + main=MAIN_URI + ) + + _assert_dataproc_job_id(mock_hook, dataproc_task) + class DataProcSparkOperatorTest(unittest.TestCase): # Unit test for the DataProcSparkOperator def test_hook_correct_region(self): - with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook: + with patch(HOOK) as mock_hook: dataproc_task = DataProcSparkOperator( task_id=TASK_ID, region=REGION ) dataproc_task.execute(None) - mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION) + mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, + REGION) + + def test_dataproc_job_id_is_set(self): + with patch(HOOK) as mock_hook: + dataproc_task = DataProcSparkOperator( + task_id=TASK_ID + ) + + _assert_dataproc_job_id(mock_hook, dataproc_task) class DataprocWorkflowTemplateInstantiateOperatorTest(unittest.TestCase):