From commits-return-21080-archive-asf-public=cust-asf.ponee.io@airflow.incubator.apache.org Thu Sep 6 09:08:07 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id DF9BB180674 for ; Thu, 6 Sep 2018 09:08:05 +0200 (CEST) Received: (qmail 4086 invoked by uid 500); 6 Sep 2018 07:08:04 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 4072 invoked by uid 99); 6 Sep 2018 07:08:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Sep 2018 07:08:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 595F8CB85A for ; Thu, 6 Sep 2018 07:08:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.501 X-Spam-Level: X-Spam-Status: No, score=-109.501 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id T2v843uXIcNd for ; Thu, 6 Sep 2018 07:08:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 2C5C85F3EA for ; Thu, 6 Sep 2018 07:08:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 77377E0DAA for ; Thu, 6 Sep 2018 07:08:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id D449426B56 for ; Thu, 6 Sep 2018 07:08:00 +0000 (UTC) Date: Thu, 6 Sep 2018 07:08:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: commits@airflow.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (AIRFLOW-1998) Implement Databricks Operator for jobs/run-now endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AIRFLOW-1998?page=3Dcom.atlassi= an.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D16= 605369#comment-16605369 ]=20 ASF GitHub Bot commented on AIRFLOW-1998: ----------------------------------------- Fokko closed pull request #3813: [AIRFLOW-1998] Implemented DatabricksRunNo= wOperator for jobs/run-now =E2=80=A6 URL: https://github.com/apache/incubator-airflow/pull/3813 =20 =20 =20 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/hooks/databricks_hook.py b/airflow/contrib/hoo= ks/databricks_hook.py index cb2ba9bd00..284db98d91 100644 --- a/airflow/contrib/hooks/databricks_hook.py +++ b/airflow/contrib/hooks/databricks_hook.py @@ -37,6 +37,7 @@ START_CLUSTER_ENDPOINT =3D ("POST", "api/2.0/clusters/start") TERMINATE_CLUSTER_ENDPOINT =3D ("POST", "api/2.0/clusters/delete") =20 +RUN_NOW_ENDPOINT =3D ('POST', 'api/2.0/jobs/run-now') SUBMIT_RUN_ENDPOINT =3D ('POST', 'api/2.0/jobs/runs/submit') GET_RUN_ENDPOINT =3D ('GET', 'api/2.0/jobs/runs/get') CANCEL_RUN_ENDPOINT =3D ('POST', 'api/2.0/jobs/runs/cancel') @@ -161,6 +162,18 @@ def _log_request_error(self, attempt_num, error): attempt_num, error ) =20 + def run_now(self, json): + """ + Utility function to call the ``api/2.0/jobs/run-now`` endpoint. + + :param json: The data used in the body of the request to the ``run= -now`` endpoint. + :type json: dict + :return: the run_id as a string + :rtype: string + """ + response =3D self._do_api_call(RUN_NOW_ENDPOINT, json) + return response['run_id'] + def submit_run(self, json): """ Utility function to call the ``api/2.0/jobs/runs/submit`` endpoint= . diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/con= trib/operators/databricks_operator.py index 3245a99256..aed2d8909c 100644 --- a/airflow/contrib/operators/databricks_operator.py +++ b/airflow/contrib/operators/databricks_operator.py @@ -30,6 +30,66 @@ XCOM_RUN_PAGE_URL_KEY =3D 'run_page_url' =20 =20 +def _deep_string_coerce(content, json_path=3D'json'): + """ + Coerces content or all values of content if it is a dict to a string. = The + function will throw if content contains non-string or non-numeric type= s. + + The reason why we have this function is because the ``self.json`` fiel= d must be a + dict with only string values. This is because ``render_template`` wil= l fail + for numerical values. + """ + c =3D _deep_string_coerce + if isinstance(content, six.string_types): + return content + elif isinstance(content, six.integer_types + (float,)): + # Databricks can tolerate either numeric or string types in the AP= I backend. + return str(content) + elif isinstance(content, (list, tuple)): + return [c(e, '{0}[{1}]'.format(json_path, i)) for i, e in enumerat= e(content)] + elif isinstance(content, dict): + return {k: c(v, '{0}[{1}]'.format(json_path, k)) + for k, v in list(content.items())} + else: + param_type =3D type(content) + msg =3D 'Type {0} used for parameter {1} is not a number or a stri= ng' \ + .format(param_type, json_path) + raise AirflowException(msg) + + +def _handle_databricks_operator_execution(operator, hook, log, context): + """ + Handles the Airflow + Databricks lifecycle logic for a Databricks oper= ator + :param operator: Databricks operator being handled + :param context: Airflow context + """ + if operator.do_xcom_push: + context['ti'].xcom_push(key=3DXCOM_RUN_ID_KEY, value=3Doperator.ru= n_id) + log.info('Run submitted with run_id: %s', operator.run_id) + run_page_url =3D hook.get_run_page_url(operator.run_id) + if operator.do_xcom_push: + context['ti'].xcom_push(key=3DXCOM_RUN_PAGE_URL_KEY, value=3Drun_p= age_url) + + log.info('View run status, Spark UI, and logs at %s', run_page_url) + while True: + run_state =3D hook.get_run_state(operator.run_id) + if run_state.is_terminal: + if run_state.is_successful: + log.info('%s completed successfully.', operator.task_id) + log.info('View run status, Spark UI, and logs at %s', run_= page_url) + return + else: + error_message =3D '{t} failed with terminal state: {s}'.fo= rmat( + t=3Doperator.task_id, + s=3Drun_state) + raise AirflowException(error_message) + else: + log.info('%s in run state: %s', operator.task_id, run_state) + log.info('View run status, Spark UI, and logs at %s', run_page= _url) + log.info('Sleeping for %s seconds.', operator.polling_period_s= econds) + time.sleep(operator.polling_period_seconds) + + class DatabricksSubmitRunOperator(BaseOperator): """ Submits an Spark job run to Databricks using the @@ -200,39 +260,202 @@ def __init__( if 'run_name' not in self.json: self.json['run_name'] =3D run_name or kwargs['task_id'] =20 - self.json =3D self._deep_string_coerce(self.json) + self.json =3D _deep_string_coerce(self.json) # This variable will be used in case our task gets killed. self.run_id =3D None self.do_xcom_push =3D do_xcom_push =20 - def _deep_string_coerce(self, content, json_path=3D'json'): - """ - Coerces content or all values of content if it is a dict to a stri= ng. The - function will throw if content contains non-string or non-numeric = types. + def get_hook(self): + return DatabricksHook( + self.databricks_conn_id, + retry_limit=3Dself.databricks_retry_limit, + retry_delay=3Dself.databricks_retry_delay) + + def execute(self, context): + hook =3D self.get_hook() + self.run_id =3D hook.submit_run(self.json) + _handle_databricks_operator_execution(self, hook, self.log, contex= t) + + def on_kill(self): + hook =3D self.get_hook() + hook.cancel_run(self.run_id) + self.log.info( + 'Task: %s with run_id: %s was requested to be cancelled.', + self.task_id, self.run_id + ) + + +class DatabricksRunNowOperator(BaseOperator): + """ + Runs an existing Spark job run to Databricks using the + `api/2.0/jobs/run-now + `_ + API endpoint. + + There are two ways to instantiate this operator. + + In the first way, you can take the JSON payload that you typically use + to call the ``api/2.0/jobs/run-now`` endpoint and pass it directly + to our ``DatabricksRunNowOperator`` through the ``json`` parameter. + For example :: + json =3D { + "job_id": 42, + "notebook_params": { + "dry-run": "true", + "oldest-time-to-consider": "1457570074236" + } + } + + notebook_run =3D DatabricksRunNowOperator(task_id=3D'notebook_run'= , json=3Djson) + + Another way to accomplish the same thing is to use the named parameter= s + of the ``DatabricksRunNowOperator`` directly. Note that there is exact= ly + one named parameter for each top level parameter in the ``run-now`` + endpoint. In this method, your code would look like this: :: + + job_id=3D42 + + notebook_params =3D { + "dry-run": "true", + "oldest-time-to-consider": "1457570074236" + } + + python_params =3D ["douglas adams", "42"] + + spark_submit_params =3D ["--class", "org.apache.spark.examples.Spa= rkPi"] + + notebook_run =3D DatabricksRunNowOperator( + job_id=3Djob_id, + notebook_params=3Dnotebook_params, + python_params=3Dpython_params, + spark_submit_params=3Dspark_submit_params + ) + + In the case where both the json parameter **AND** the named parameters + are provided, they will be merged together. If there are conflicts dur= ing the merge, + the named parameters will take precedence and override the top level `= `json`` keys. + + Currently the named parameters that ``DatabricksRunNowOperator`` suppo= rts are + - ``job_id`` + - ``json`` + - ``notebook_params`` + - ``python_params`` + - ``spark_submit_params`` + + + :param job_id: the job_id of the existing Databricks job. + This field will be templated. + .. seealso:: + https://docs.databricks.com/api/latest/jobs.html#run-now + :type job_id: string + :param json: A JSON object containing API parameters which will be pas= sed + directly to the ``api/2.0/jobs/run-now`` endpoint. The other named= parameters + (i.e. ``notebook_params``, ``spark_submit_params``..) to this oper= ator will + be merged with this json dictionary if they are provided. + If there are conflicts during the merge, the named parameters will + take precedence and override the top level json keys. (templated) + + .. seealso:: + For more information about templating see :ref:`jinja-templati= ng`. + https://docs.databricks.com/api/latest/jobs.html#run-now + :type json: dict + :param notebook_params: A dict from keys to values for jobs with noteb= ook task, + e.g. "notebook_params": {"name": "john doe", "age": "35"}. + The map is passed to the notebook and will be accessible through t= he + dbutils.widgets.get function. See Widgets for more information. + If not specified upon run-now, the triggered run will use the + job=E2=80=99s base parameters. notebook_params cannot be + specified in conjunction with jar_params. The json representation + of this field (i.e. {"notebook_params":{"name":"john doe","age":"3= 5"}}) + cannot exceed 10,000 bytes. + This field will be templated. + + .. seealso:: + https://docs.databricks.com/user-guide/notebooks/widgets.html + :type notebook_params: dict + :param python_params: A list of parameters for jobs with python tasks, + e.g. "python_params": ["john doe", "35"]. + The parameters will be passed to python file as command line param= eters. + If specified upon run-now, it would overwrite the parameters speci= fied in + job setting. + The json representation of this field (i.e. {"python_params":["joh= n doe","35"]}) + cannot exceed 10,000 bytes. + This field will be templated. + + .. seealso:: + https://docs.databricks.com/api/latest/jobs.html#run-now + :type python_params: array of strings + :param spark_submit_params: A list of parameters for jobs with spark s= ubmit task, + e.g. "spark_submit_params": ["--class", "org.apache.spark.examples= .SparkPi"]. + The parameters will be passed to spark-submit script as command li= ne parameters. + If specified upon run-now, it would overwrite the parameters speci= fied + in job setting. + The json representation of this field cannot exceed 10,000 bytes. + This field will be templated. + .. seealso:: + https://docs.databricks.com/api/latest/jobs.html#run-now + :type spark_submit_params: array of strings + :param timeout_seconds: The timeout for this run. By default a value o= f 0 is used + which means to have no timeout. + This field will be templated. + :type timeout_seconds: int32 + :param databricks_conn_id: The name of the Airflow connection to use. + By default and in the common case this will be ``databricks_defaul= t``. To use + token based authentication, provide the key ``token`` in the extra= field for the + connection. + :type databricks_conn_id: string + :param polling_period_seconds: Controls the rate which we poll for the= result of + this run. By default the operator will poll every 30 seconds. + :type polling_period_seconds: int + :param databricks_retry_limit: Amount of times retry if the Databricks= backend is + unreachable. Its value must be greater than or equal to 1. + :type databricks_retry_limit: int + :param do_xcom_push: Whether we should push run_id and run_page_url to= xcom. + :type do_xcom_push: boolean + """ + # Used in airflow.models.BaseOperator + template_fields =3D ('json',) + # Databricks brand color (blue) under white text + ui_color =3D '#1CB1C2' + ui_fgcolor =3D '#fff' + + def __init__( + self, + job_id, + json=3DNone, + notebook_params=3DNone, + python_params=3DNone, + spark_submit_params=3DNone, + databricks_conn_id=3D'databricks_default', + polling_period_seconds=3D30, + databricks_retry_limit=3D3, + databricks_retry_delay=3D1, + do_xcom_push=3DFalse, + **kwargs): =20 - The reason why we have this function is because the ``self.json`` = field must be a - dict with only string values. This is because ``render_template``= will fail - for numerical values. """ - c =3D self._deep_string_coerce - if isinstance(content, six.string_types): - return content - elif isinstance(content, six.integer_types + (float,)): - # Databricks can tolerate either numeric or string types in th= e API backend. - return str(content) - elif isinstance(content, (list, tuple)): - return [c(e, '{0}[{1}]'.format(json_path, i)) for i, e in enum= erate(content)] - elif isinstance(content, dict): - return {k: c(v, '{0}[{1}]'.format(json_path, k)) - for k, v in list(content.items())} - else: - param_type =3D type(content) - msg =3D 'Type {0} used for parameter {1} is not a number or a = string'\ - .format(param_type, json_path) - raise AirflowException(msg) + Creates a new ``DatabricksRunNowOperator``. + """ + super(DatabricksRunNowOperator, self).__init__(**kwargs) + self.json =3D json or {} + self.databricks_conn_id =3D databricks_conn_id + self.polling_period_seconds =3D polling_period_seconds + self.databricks_retry_limit =3D databricks_retry_limit + self.databricks_retry_delay =3D databricks_retry_delay =20 - def _log_run_page_url(self, url): - self.log.info('View run status, Spark UI, and logs at %s', url) + if job_id is not None: + self.json['job_id'] =3D job_id + if notebook_params is not None: + self.json['notebook_params'] =3D notebook_params + if python_params is not None: + self.json['python_params'] =3D python_params + if spark_submit_params is not None: + self.json['spark_submit_params'] =3D spark_submit_params + + self.json =3D _deep_string_coerce(self.json) + # This variable will be used in case our task gets killed. + self.run_id =3D None + self.do_xcom_push =3D do_xcom_push =20 def get_hook(self): return DatabricksHook( @@ -242,31 +465,8 @@ def get_hook(self): =20 def execute(self, context): hook =3D self.get_hook() - self.run_id =3D hook.submit_run(self.json) - if self.do_xcom_push: - context['ti'].xcom_push(key=3DXCOM_RUN_ID_KEY, value=3Dself.ru= n_id) - self.log.info('Run submitted with run_id: %s', self.run_id) - run_page_url =3D hook.get_run_page_url(self.run_id) - if self.do_xcom_push: - context['ti'].xcom_push(key=3DXCOM_RUN_PAGE_URL_KEY, value=3Dr= un_page_url) - self._log_run_page_url(run_page_url) - while True: - run_state =3D hook.get_run_state(self.run_id) - if run_state.is_terminal: - if run_state.is_successful: - self.log.info('%s completed successfully.', self.task_= id) - self._log_run_page_url(run_page_url) - return - else: - error_message =3D '{t} failed with terminal state: {s}= '.format( - t=3Dself.task_id, - s=3Drun_state) - raise AirflowException(error_message) - else: - self.log.info('%s in run state: %s', self.task_id, run_sta= te) - self._log_run_page_url(run_page_url) - self.log.info('Sleeping for %s seconds.', self.polling_per= iod_seconds) - time.sleep(self.polling_period_seconds) + self.run_id =3D hook.run_now(self.json) + _handle_databricks_operator_execution(self, hook, self.log, contex= t) =20 def on_kill(self): hook =3D self.get_hook() diff --git a/tests/contrib/hooks/test_databricks_hook.py b/tests/contrib/ho= oks/test_databricks_hook.py index 04a7c8dc3c..090f46caeb 100644 --- a/tests/contrib/hooks/test_databricks_hook.py +++ b/tests/contrib/hooks/test_databricks_hook.py @@ -54,6 +54,7 @@ } CLUSTER_ID =3D 'cluster_id' RUN_ID =3D 1 +JOB_ID =3D 42 HOST =3D 'xx.cloud.databricks.com' HOST_WITH_SCHEME =3D 'https://xx.cloud.databricks.com' LOGIN =3D 'login' @@ -70,9 +71,21 @@ 'state_message': STATE_MESSAGE } } +NOTEBOOK_PARAMS =3D { + "dry-run": "true", + "oldest-time-to-consider": "1457570074236" +} +JAR_PARAMS =3D ["param1", "param2"] RESULT_STATE =3D None =20 =20 +def run_now_endpoint(host): + """ + Utility function to generate the run now endpoint given the host. + """ + return 'https://{}/api/2.0/jobs/run-now'.format(host) + + def submit_run_endpoint(host): """ Utility function to generate the submit run endpoint given the host. @@ -160,6 +173,7 @@ def setUp(self, session=3DNone): conn.host =3D HOST conn.login =3D LOGIN conn.password =3D PASSWORD + conn.extra =3D None session.commit() =20 self.hook =3D DatabricksHook(retry_delay=3D0) @@ -270,6 +284,32 @@ def test_submit_run(self, mock_requests): headers=3DUSER_AGENT_HEADER, timeout=3Dself.hook.timeout_seconds) =20 + @mock.patch('airflow.contrib.hooks.databricks_hook.requests') + def test_run_now(self, mock_requests): + mock_requests.codes.ok =3D 200 + mock_requests.post.return_value.json.return_value =3D {'run_id': '= 1'} + status_code_mock =3D mock.PropertyMock(return_value=3D200) + type(mock_requests.post.return_value).status_code =3D status_code_= mock + json =3D { + 'notebook_params': NOTEBOOK_PARAMS, + 'jar_params': JAR_PARAMS, + 'job_id': JOB_ID + } + run_id =3D self.hook.run_now(json) + + self.assertEquals(run_id, '1') + + mock_requests.post.assert_called_once_with( + run_now_endpoint(HOST), + json=3D{ + 'notebook_params': NOTEBOOK_PARAMS, + 'jar_params': JAR_PARAMS, + 'job_id': JOB_ID + }, + auth=3D(LOGIN, PASSWORD), + headers=3DUSER_AGENT_HEADER, + timeout=3Dself.hook.timeout_seconds) + @mock.patch('airflow.contrib.hooks.databricks_hook.requests') def test_get_run_page_url(self, mock_requests): mock_requests.get.return_value.json.return_value =3D GET_RUN_RESPO= NSE diff --git a/tests/contrib/operators/test_databricks_operator.py b/tests/co= ntrib/operators/test_databricks_operator.py index afe1a92f28..5884fc3c98 100644 --- a/tests/contrib/operators/test_databricks_operator.py +++ b/tests/contrib/operators/test_databricks_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -#=20 +# # http://www.apache.org/licenses/LICENSE-2.0 -#=20 +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,7 +23,9 @@ from datetime import datetime =20 from airflow.contrib.hooks.databricks_hook import RunState +import airflow.contrib.operators.databricks_operator as databricks_operato= r from airflow.contrib.operators.databricks_operator import DatabricksSubmit= RunOperator +from airflow.contrib.operators.databricks_operator import DatabricksRunNow= Operator from airflow.exceptions import AirflowException from airflow.models import DAG =20 @@ -58,6 +60,40 @@ EXISTING_CLUSTER_ID =3D 'existing-cluster-id' RUN_NAME =3D 'run-name' RUN_ID =3D 1 +JOB_ID =3D 42 +NOTEBOOK_PARAMS =3D { + "dry-run": "true", + "oldest-time-to-consider": "1457570074236" +} +JAR_PARAMS =3D ["param1", "param2"] +RENDERED_TEMPLATED_JAR_PARAMS =3D [ + '/test-{0}'.format(DATE) +] +TEMPLATED_JAR_PARAMS =3D [ + '/test-{{ ds }}' +] +PYTHON_PARAMS =3D ["john doe", "35"] +SPARK_SUBMIT_PARAMS =3D ["--class", "org.apache.spark.examples.SparkPi"] + + +class DatabricksOperatorSharedFunctions(unittest.TestCase): + def test_deep_string_coerce(self): + test_json =3D { + 'test_int': 1, + 'test_float': 1.0, + 'test_dict': {'key': 'value'}, + 'test_list': [1, 1.0, 'a', 'b'], + 'test_tuple': (1, 1.0, 'a', 'b') + } + + expected =3D { + 'test_int': '1', + 'test_float': '1.0', + 'test_dict': {'key': 'value'}, + 'test_list': ['1', '1.0', 'a', 'b'], + 'test_tuple': ['1', '1.0', 'a', 'b'] + } + self.assertDictEqual(databricks_operator._deep_string_coerce(test_= json), expected) =20 =20 class DatabricksSubmitRunOperatorTest(unittest.TestCase): @@ -65,12 +101,15 @@ def test_init_with_named_parameters(self): """ Test the initializer with the named parameters. """ - op =3D DatabricksSubmitRunOperator(task_id=3DTASK_ID, new_cluster= =3DNEW_CLUSTER, notebook_task=3DNOTEBOOK_TASK) - expected =3D op._deep_string_coerce({ - 'new_cluster': NEW_CLUSTER, - 'notebook_task': NOTEBOOK_TASK, - 'run_name': TASK_ID + op =3D DatabricksSubmitRunOperator(task_id=3DTASK_ID, + new_cluster=3DNEW_CLUSTER, + notebook_task=3DNOTEBOOK_TASK) + expected =3D databricks_operator._deep_string_coerce({ + 'new_cluster': NEW_CLUSTER, + 'notebook_task': NOTEBOOK_TASK, + 'run_name': TASK_ID }) + self.assertDictEqual(expected, op.json) =20 def test_init_with_json(self): @@ -78,14 +117,14 @@ def test_init_with_json(self): Test the initializer with json data. """ json =3D { - 'new_cluster': NEW_CLUSTER, - 'notebook_task': NOTEBOOK_TASK + 'new_cluster': NEW_CLUSTER, + 'notebook_task': NOTEBOOK_TASK } op =3D DatabricksSubmitRunOperator(task_id=3DTASK_ID, json=3Djson) - expected =3D op._deep_string_coerce({ - 'new_cluster': NEW_CLUSTER, - 'notebook_task': NOTEBOOK_TASK, - 'run_name': TASK_ID + expected =3D databricks_operator._deep_string_coerce({ + 'new_cluster': NEW_CLUSTER, + 'notebook_task': NOTEBOOK_TASK, + 'run_name': TASK_ID }) self.assertDictEqual(expected, op.json) =20 @@ -99,10 +138,10 @@ def test_init_with_specified_run_name(self): 'run_name': RUN_NAME } op =3D DatabricksSubmitRunOperator(task_id=3DTASK_ID, json=3Djson) - expected =3D op._deep_string_coerce({ - 'new_cluster': NEW_CLUSTER, - 'notebook_task': NOTEBOOK_TASK, - 'run_name': RUN_NAME + expected =3D databricks_operator._deep_string_coerce({ + 'new_cluster': NEW_CLUSTER, + 'notebook_task': NOTEBOOK_TASK, + 'run_name': RUN_NAME }) self.assertDictEqual(expected, op.json) =20 @@ -114,14 +153,16 @@ def test_init_with_merging(self): """ override_new_cluster =3D {'workers': 999} json =3D { - 'new_cluster': NEW_CLUSTER, - 'notebook_task': NOTEBOOK_TASK, + 'new_cluster': NEW_CLUSTER, + 'notebook_task': NOTEBOOK_TASK, } - op =3D DatabricksSubmitRunOperator(task_id=3DTASK_ID, json=3Djson,= new_cluster=3Doverride_new_cluster) - expected =3D op._deep_string_coerce({ - 'new_cluster': override_new_cluster, - 'notebook_task': NOTEBOOK_TASK, - 'run_name': TASK_ID, + op =3D DatabricksSubmitRunOperator(task_id=3DTASK_ID, + json=3Djson, + new_cluster=3Doverride_new_cluste= r) + expected =3D databricks_operator._deep_string_coerce({ + 'new_cluster': override_new_cluster, + 'notebook_task': NOTEBOOK_TASK, + 'run_name': TASK_ID, }) self.assertDictEqual(expected, op.json) =20 @@ -133,10 +174,10 @@ def test_init_with_templating(self): dag =3D DAG('test', start_date=3Ddatetime.now()) op =3D DatabricksSubmitRunOperator(dag=3Ddag, task_id=3DTASK_ID, j= son=3Djson) op.json =3D op.render_template('json', op.json, {'ds': DATE}) - expected =3D op._deep_string_coerce({ - 'new_cluster': NEW_CLUSTER, - 'notebook_task': RENDERED_TEMPLATED_NOTEBOOK_TASK, - 'run_name': TASK_ID, + expected =3D databricks_operator._deep_string_coerce({ + 'new_cluster': NEW_CLUSTER, + 'notebook_task': RENDERED_TEMPLATED_NOTEBOOK_TASK, + 'run_name': TASK_ID, }) self.assertDictEqual(expected, op.json) =20 @@ -146,27 +187,9 @@ def test_init_with_bad_type(self): } # Looks a bit weird since we have to escape regex reserved symbols= . exception_message =3D 'Type \<(type|class) \'datetime.datetime\'\>= used ' + \ - 'for parameter json\[test\] is not a number or a s= tring' + 'for parameter json\[test\] is not a number or= a string' with self.assertRaisesRegexp(AirflowException, exception_message): - op =3D DatabricksSubmitRunOperator(task_id=3DTASK_ID, json=3Dj= son) - - def test_deep_string_coerce(self): - op =3D DatabricksSubmitRunOperator(task_id=3D'test') - test_json =3D { - 'test_int': 1, - 'test_float': 1.0, - 'test_dict': {'key': 'value'}, - 'test_list': [1, 1.0, 'a', 'b'], - 'test_tuple': (1, 1.0, 'a', 'b') - } - expected =3D { - 'test_int': '1', - 'test_float': '1.0', - 'test_dict': {'key': 'value'}, - 'test_list': ['1', '1.0', 'a', 'b'], - 'test_tuple': ['1', '1.0', 'a', 'b'] - } - self.assertDictEqual(op._deep_string_coerce(test_json), expected) + DatabricksSubmitRunOperator(task_id=3DTASK_ID, json=3Djson) =20 @mock.patch('airflow.contrib.operators.databricks_operator.DatabricksH= ook') def test_exec_success(self, db_mock_class): @@ -184,15 +207,16 @@ def test_exec_success(self, db_mock_class): =20 op.execute(None) =20 - expected =3D op._deep_string_coerce({ - 'new_cluster': NEW_CLUSTER, - 'notebook_task': NOTEBOOK_TASK, - 'run_name': TASK_ID + expected =3D databricks_operator._deep_string_coerce({ + 'new_cluster': NEW_CLUSTER, + 'notebook_task': NOTEBOOK_TASK, + 'run_name': TASK_ID }) db_mock_class.assert_called_once_with( DEFAULT_CONN_ID, retry_limit=3Dop.databricks_retry_limit, retry_delay=3Dop.databricks_retry_delay) + db_mock.submit_run.assert_called_once_with(expected) db_mock.get_run_page_url.assert_called_once_with(RUN_ID) db_mock.get_run_state.assert_called_once_with(RUN_ID) @@ -215,10 +239,10 @@ def test_exec_failure(self, db_mock_class): with self.assertRaises(AirflowException): op.execute(None) =20 - expected =3D op._deep_string_coerce({ - 'new_cluster': NEW_CLUSTER, - 'notebook_task': NOTEBOOK_TASK, - 'run_name': TASK_ID, + expected =3D databricks_operator._deep_string_coerce({ + 'new_cluster': NEW_CLUSTER, + 'notebook_task': NOTEBOOK_TASK, + 'run_name': TASK_ID, }) db_mock_class.assert_called_once_with( DEFAULT_CONN_ID, @@ -232,8 +256,8 @@ def test_exec_failure(self, db_mock_class): @mock.patch('airflow.contrib.operators.databricks_operator.DatabricksH= ook') def test_on_kill(self, db_mock_class): run =3D { - 'new_cluster': NEW_CLUSTER, - 'notebook_task': NOTEBOOK_TASK, + 'new_cluster': NEW_CLUSTER, + 'notebook_task': NOTEBOOK_TASK, } op =3D DatabricksSubmitRunOperator(task_id=3DTASK_ID, json=3Drun) db_mock =3D db_mock_class.return_value @@ -243,3 +267,173 @@ def test_on_kill(self, db_mock_class): =20 db_mock.cancel_run.assert_called_once_with(RUN_ID) =20 + +class DatabricksRunNowOperatorTest(unittest.TestCase): + + def test_init_with_named_parameters(self): + """ + Test the initializer with the named parameters. + """ + op =3D DatabricksRunNowOperator(job_id=3DJOB_ID, task_id=3DTASK_ID= ) + expected =3D databricks_operator._deep_string_coerce({ + 'job_id': 42 + }) + + self.assertDictEqual(expected, op.json) + + def test_init_with_json(self): + """ + Test the initializer with json data. + """ + json =3D { + 'notebook_params': NOTEBOOK_PARAMS, + 'jar_params': JAR_PARAMS, + 'python_params': PYTHON_PARAMS, + 'spark_submit_params': SPARK_SUBMIT_PARAMS + } + op =3D DatabricksRunNowOperator(task_id=3DTASK_ID, job_id=3DJOB_ID= , json=3Djson) + + expected =3D databricks_operator._deep_string_coerce({ + 'notebook_params': NOTEBOOK_PARAMS, + 'jar_params': JAR_PARAMS, + 'python_params': PYTHON_PARAMS, + 'spark_submit_params': SPARK_SUBMIT_PARAMS, + 'job_id': JOB_ID + }) + + self.assertDictEqual(expected, op.json) + + def test_init_with_merging(self): + """ + Test the initializer when json and other named parameters are both + provided. The named parameters should override top level keys in t= he + json dict. + """ + override_notebook_params =3D {'workers': 999} + json =3D { + 'notebook_params': NOTEBOOK_PARAMS, + 'jar_params': JAR_PARAMS + } + + op =3D DatabricksRunNowOperator(task_id=3DTASK_ID, + json=3Djson, + job_id=3DJOB_ID, + notebook_params=3Doverride_notebook_= params, + python_params=3DPYTHON_PARAMS, + spark_submit_params=3DSPARK_SUBMIT_P= ARAMS) + + expected =3D databricks_operator._deep_string_coerce({ + 'notebook_params': override_notebook_params, + 'jar_params': JAR_PARAMS, + 'python_params': PYTHON_PARAMS, + 'spark_submit_params': SPARK_SUBMIT_PARAMS, + 'job_id': JOB_ID + }) + + self.assertDictEqual(expected, op.json) + + def test_init_with_templating(self): + json =3D { + 'notebook_params': NOTEBOOK_PARAMS, + 'jar_params': TEMPLATED_JAR_PARAMS + } + + dag =3D DAG('test', start_date=3Ddatetime.now()) + op =3D DatabricksRunNowOperator(dag=3Ddag, task_id=3DTASK_ID, job_= id=3DJOB_ID, json=3Djson) + op.json =3D op.render_template('json', op.json, {'ds': DATE}) + expected =3D databricks_operator._deep_string_coerce({ + 'notebook_params': NOTEBOOK_PARAMS, + 'jar_params': RENDERED_TEMPLATED_JAR_PARAMS, + 'job_id': JOB_ID + }) + self.assertDictEqual(expected, op.json) + + def test_init_with_bad_type(self): + json =3D { + 'test': datetime.now() + } + # Looks a bit weird since we have to escape regex reserved symbols= . + exception_message =3D 'Type \<(type|class) \'datetime.datetime\'\>= used ' + \ + 'for parameter json\[test\] is not a number or= a string' + with self.assertRaisesRegexp(AirflowException, exception_message): + DatabricksRunNowOperator(task_id=3DTASK_ID, job_id=3DJOB_ID, j= son=3Djson) + + @mock.patch('airflow.contrib.operators.databricks_operator.DatabricksH= ook') + def test_exec_success(self, db_mock_class): + """ + Test the execute function in case where the run is successful. + """ + run =3D { + 'notebook_params': NOTEBOOK_PARAMS, + 'notebook_task': NOTEBOOK_TASK, + 'jar_params': JAR_PARAMS + } + op =3D DatabricksRunNowOperator(task_id=3DTASK_ID, job_id=3DJOB_ID= , json=3Drun) + db_mock =3D db_mock_class.return_value + db_mock.run_now.return_value =3D 1 + db_mock.get_run_state.return_value =3D RunState('TERMINATED', 'SUC= CESS', '') + + op.execute(None) + + expected =3D databricks_operator._deep_string_coerce({ + 'notebook_params': NOTEBOOK_PARAMS, + 'notebook_task': NOTEBOOK_TASK, + 'jar_params': JAR_PARAMS, + 'job_id': JOB_ID + }) + + db_mock_class.assert_called_once_with( + DEFAULT_CONN_ID, + retry_limit=3Dop.databricks_retry_limit, + retry_delay=3Dop.databricks_retry_delay) + db_mock.run_now.assert_called_once_with(expected) + db_mock.get_run_page_url.assert_called_once_with(RUN_ID) + db_mock.get_run_state.assert_called_once_with(RUN_ID) + self.assertEquals(RUN_ID, op.run_id) + + @mock.patch('airflow.contrib.operators.databricks_operator.DatabricksH= ook') + def test_exec_failure(self, db_mock_class): + """ + Test the execute function in case where the run failed. + """ + run =3D { + 'notebook_params': NOTEBOOK_PARAMS, + 'notebook_task': NOTEBOOK_TASK, + 'jar_params': JAR_PARAMS + } + op =3D DatabricksRunNowOperator(task_id=3DTASK_ID, job_id=3DJOB_ID= , json=3Drun) + db_mock =3D db_mock_class.return_value + db_mock.run_now.return_value =3D 1 + db_mock.get_run_state.return_value =3D RunState('TERMINATED', 'FAI= LED', '') + + with self.assertRaises(AirflowException): + op.execute(None) + + expected =3D databricks_operator._deep_string_coerce({ + 'notebook_params': NOTEBOOK_PARAMS, + 'notebook_task': NOTEBOOK_TASK, + 'jar_params': JAR_PARAMS, + 'job_id': JOB_ID + }) + db_mock_class.assert_called_once_with( + DEFAULT_CONN_ID, + retry_limit=3Dop.databricks_retry_limit, + retry_delay=3Dop.databricks_retry_delay) + db_mock.run_now.assert_called_once_with(expected) + db_mock.get_run_page_url.assert_called_once_with(RUN_ID) + db_mock.get_run_state.assert_called_once_with(RUN_ID) + self.assertEquals(RUN_ID, op.run_id) + + @mock.patch('airflow.contrib.operators.databricks_operator.DatabricksH= ook') + def test_on_kill(self, db_mock_class): + run =3D { + 'notebook_params': NOTEBOOK_PARAMS, + 'notebook_task': NOTEBOOK_TASK, + 'jar_params': JAR_PARAMS + } + op =3D DatabricksRunNowOperator(task_id=3DTASK_ID, job_id=3DJOB_ID= , json=3Drun) + db_mock =3D db_mock_class.return_value + op.run_id =3D RUN_ID + + op.on_kill() + db_mock.cancel_run.assert_called_once_with(RUN_ID) =20 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. =20 For queries about this service, please contact Infrastructure at: users@infra.apache.org > Implement Databricks Operator for jobs/run-now endpoint > ------------------------------------------------------- > > Key: AIRFLOW-1998 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1998 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks, operators > Affects Versions: 1.9.0 > Reporter: Diego Rabatone Oliveira > Assignee: Israel Knight > Priority: Major > Fix For: 2.0.0 > > > Implement a Operator to deal with Databricks '2.0/jobs/run-now' API Endpo= int. -- This message was sent by Atlassian JIRA (v7.6.3#76005)