Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A6EE4200C19 for ; Sun, 12 Feb 2017 13:10:53 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A5B9B160B6B; Sun, 12 Feb 2017 12:10:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 279A0160B45 for ; Sun, 12 Feb 2017 13:10:52 +0100 (CET) Received: (qmail 67394 invoked by uid 500); 12 Feb 2017 12:10:51 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 67385 invoked by uid 99); 12 Feb 2017 12:10:51 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 12 Feb 2017 12:10:51 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id D4C2EC0E29 for ; Sun, 12 Feb 2017 12:10:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id AfUD7KLT47fC for ; Sun, 12 Feb 2017 12:10:48 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id D70E45FAD1 for ; Sun, 12 Feb 2017 12:10:46 +0000 (UTC) Received: (qmail 67381 invoked by uid 99); 12 Feb 2017 12:10:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 12 Feb 2017 12:10:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EA2DFDFCC6; Sun, 12 Feb 2017 12:10:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bolke@apache.org To: commits@airflow.incubator.apache.org Message-Id: <78f8d74deb5b4f6ca0b58c9ea62c0865@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-airflow git commit: Revert "Revert "[AIRFLOW-782] Add support for DataFlowPythonOperator."" Date: Sun, 12 Feb 2017 12:10:45 +0000 (UTC) archived-at: Sun, 12 Feb 2017 12:10:53 -0000 Repository: incubator-airflow Updated Branches: refs/heads/v1-8-test 8aacc283a -> eddecd59d Revert "Revert "[AIRFLOW-782] Add support for DataFlowPythonOperator."" This reverts commit 7e65998a1bedd00e74fa333cfee78ad574aaa849. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eddecd59 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eddecd59 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eddecd59 Branch: refs/heads/v1-8-test Commit: eddecd59d73191904f2f156e53a138e532dc560a Parents: 8aacc28 Author: Bolke de Bruin Authored: Sun Feb 12 13:10:33 2017 +0100 Committer: Bolke de Bruin Committed: Sun Feb 12 13:10:33 2017 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/gcp_dataflow_hook.py | 33 +++++--- airflow/contrib/operators/dataflow_operator.py | 85 +++++++++++++++++++-- tests/contrib/hooks/gcp_dataflow_hook.py | 56 ++++++++++++++ tests/contrib/operators/dataflow_operator.py | 76 ++++++++++++++++++ 4 files changed, 232 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eddecd59/airflow/contrib/hooks/gcp_dataflow_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py index bd5bd3c..aaa9992 100644 --- a/airflow/contrib/hooks/gcp_dataflow_hook.py +++ b/airflow/contrib/hooks/gcp_dataflow_hook.py @@ -24,6 +24,7 @@ from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook class _DataflowJob(object): + def __init__(self, dataflow, project_number, name): self._dataflow = dataflow self._project_number = project_number @@ -82,7 +83,8 @@ class _DataflowJob(object): return self._job -class _DataflowJava(object): +class _Dataflow(object): + def __init__(self, cmd): self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -113,11 +115,12 @@ class _DataflowJava(object): else: logging.info("Waiting for DataFlow process to complete.") if self._proc.returncode is not 0: - raise Exception("DataFlow jar failed with return code {}".format( + raise Exception("DataFlow failed with return code {}".format( self._proc.returncode)) class DataFlowHook(GoogleCloudBaseHook): + def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None): @@ -130,21 +133,27 @@ class DataFlowHook(GoogleCloudBaseHook): http_authorized = self._authorize() return build('dataflow', 'v1b3', http=http_authorized) + def _start_dataflow(self, task_id, variables, dataflow, name, command_prefix): + cmd = command_prefix + self._build_cmd(task_id, variables, dataflow) + _Dataflow(cmd).wait_for_done() + _DataflowJob( + self.get_conn(), variables['project'], name).wait_for_done() + def start_java_dataflow(self, task_id, variables, dataflow): name = task_id + "-" + str(uuid.uuid1())[:8] - cmd = self._build_cmd(task_id, variables, dataflow, name) - _DataflowJava(cmd).wait_for_done() - _DataflowJob(self.get_conn(), variables['project'], name).wait_for_done() + variables['jobName'] = name + self._start_dataflow( + task_id, variables, dataflow, name, ["java", "-jar"]) - def _build_cmd(self, task_id, variables, dataflow, name): - command = ["java", "-jar", - dataflow, - "--runner=DataflowPipelineRunner", - "--streaming=false", - "--jobName=" + name] + def start_python_dataflow(self, task_id, variables, dataflow, py_options): + name = task_id + "-" + str(uuid.uuid1())[:8] + variables["job_name"] = name + self._start_dataflow( + task_id, variables, dataflow, name, ["python"] + py_options) + def _build_cmd(self, task_id, variables, dataflow): + command = [dataflow, "--runner=DataflowPipelineRunner"] if variables is not None: for attr, value in variables.iteritems(): command.append("--" + attr + "=" + value) - return command http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eddecd59/airflow/contrib/operators/dataflow_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py index 10a6811..ef49eb6 100644 --- a/airflow/contrib/operators/dataflow_operator.py +++ b/airflow/contrib/operators/dataflow_operator.py @@ -13,6 +13,7 @@ # limitations under the License. import copy +import re from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook from airflow.models import BaseOperator @@ -70,9 +71,13 @@ class DataFlowJavaOperator(BaseOperator): *args, **kwargs): """ - Create a new DataFlowJavaOperator. + Create a new DataFlowJavaOperator. Note that both + dataflow_default_options and options will be merged to specify pipeline + execution parameter, and dataflow_default_options is expected to save + high-level options, for instances, project and zone information, which + apply to all dataflow operators in the DAG. - For more detail on about job submission have a look at the reference: + For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params @@ -82,11 +87,12 @@ class DataFlowJavaOperator(BaseOperator): :type dataflow_default_options: dict :param options: Map of job specific options. :type options: dict - :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :param gcp_conn_id: The connection ID to use connecting to Google Cloud + Platform. :type gcp_conn_id: string :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have domain-wide - delegation enabled. + For this to work, the service account making the request must have + domain-wide delegation enabled. :type delegate_to: string """ super(DataFlowJavaOperator, self).__init__(*args, **kwargs) @@ -101,9 +107,76 @@ class DataFlowJavaOperator(BaseOperator): self.options = options def execute(self, context): - hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) + hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) dataflow_options = copy.copy(self.dataflow_default_options) dataflow_options.update(self.options) hook.start_java_dataflow(self.task_id, dataflow_options, self.jar) + + +class DataFlowPythonOperator(BaseOperator): + + @apply_defaults + def __init__( + self, + py_file, + py_options=None, + dataflow_default_options=None, + options=None, + gcp_conn_id='google_cloud_default', + delegate_to=None, + *args, + **kwargs): + """ + Create a new DataFlowPythonOperator. Note that both + dataflow_default_options and options will be merged to specify pipeline + execution parameter, and dataflow_default_options is expected to save + high-level options, for instances, project and zone information, which + apply to all dataflow operators in the DAG. + + For more detail on job submission have a look at the reference: + + https://cloud.google.com/dataflow/pipelines/specifying-exec-params + + :param py_file: Reference to the python dataflow pipleline file, e.g., + /some/local/file/path/to/your/python/pipeline/file.py. + :type py_file: string + :param py_options: Additional python options. + :type pyt_options: list of strings, e.g., ["-m", "-v"]. + :param dataflow_default_options: Map of default job options. + :type dataflow_default_options: dict + :param options: Map of job specific options. + :type options: dict + :param gcp_conn_id: The connection ID to use connecting to Google Cloud + Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have + domain-wide delegation enabled. + :type delegate_to: string + """ + super(DataFlowPythonOperator, self).__init__(*args, **kwargs) + + self.py_file = py_file + self.py_options = py_options or [] + self.dataflow_default_options = dataflow_default_options or {} + self.options = options or {} + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + + def execute(self, context): + """Execute the python dataflow job.""" + hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + dataflow_options = self.dataflow_default_options.copy() + dataflow_options.update(self.options) + # Convert argument names from lowerCamelCase to snake case. + camel_to_snake = lambda name: re.sub( + r'[A-Z]', lambda x: '_' + x.group(0).lower(), name) + formatted_options = {camel_to_snake(key): dataflow_options[key] + for key in dataflow_options} + hook.start_python_dataflow( + self.task_id, formatted_options, + self.py_file, self.py_options) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eddecd59/tests/contrib/hooks/gcp_dataflow_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/gcp_dataflow_hook.py b/tests/contrib/hooks/gcp_dataflow_hook.py new file mode 100644 index 0000000..797d40c --- /dev/null +++ b/tests/contrib/hooks/gcp_dataflow_hook.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest +from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +TASK_ID = 'test-python-dataflow' +PY_FILE = 'apache_beam.examples.wordcount' +PY_OPTIONS = ['-m'] +OPTIONS = { + 'project': 'test', + 'staging_location': 'gs://test/staging' +} +BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}' +DATAFLOW_STRING = 'airflow.contrib.hooks.gcp_dataflow_hook.{}' + + +def mock_init(self, gcp_conn_id, delegate_to=None): + pass + + +class DataFlowHookTest(unittest.TestCase): + + def setUp(self): + with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'), + new=mock_init): + self.dataflow_hook = DataFlowHook(gcp_conn_id='test') + + @mock.patch(DATAFLOW_STRING.format('DataFlowHook._start_dataflow')) + def test_start_python_dataflow(self, internal_dataflow_mock): + self.dataflow_hook.start_python_dataflow( + task_id=TASK_ID, variables=OPTIONS, + dataflow=PY_FILE, py_options=PY_OPTIONS) + internal_dataflow_mock.assert_called_once_with( + TASK_ID, OPTIONS, PY_FILE, mock.ANY, ['python'] + PY_OPTIONS) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eddecd59/tests/contrib/operators/dataflow_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/dataflow_operator.py b/tests/contrib/operators/dataflow_operator.py new file mode 100644 index 0000000..4f887c1 --- /dev/null +++ b/tests/contrib/operators/dataflow_operator.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +TASK_ID = 'test-python-dataflow' +PY_FILE = 'apache_beam.examples.wordcount' +PY_OPTIONS = ['-m'] +DEFAULT_OPTIONS = { + 'project': 'test', + 'stagingLocation': 'gs://test/staging' +} +ADDITIONAL_OPTIONS = { + 'output': 'gs://test/output' +} + + +class DataFlowPythonOperatorTest(unittest.TestCase): + + def setUp(self): + self.dataflow = DataFlowPythonOperator( + task_id=TASK_ID, + py_file=PY_FILE, + py_options=PY_OPTIONS, + dataflow_default_options=DEFAULT_OPTIONS, + options=ADDITIONAL_OPTIONS) + + def test_init(self): + """Test DataFlowPythonOperator instance is properly initialized.""" + self.assertEqual(self.dataflow.task_id, TASK_ID) + self.assertEqual(self.dataflow.py_file, PY_FILE) + self.assertEqual(self.dataflow.py_options, PY_OPTIONS) + self.assertEqual(self.dataflow.dataflow_default_options, + DEFAULT_OPTIONS) + self.assertEqual(self.dataflow.options, + ADDITIONAL_OPTIONS) + + @mock.patch('airflow.contrib.operators.dataflow_operator.DataFlowHook') + def test_exec(self, dataflow_mock): + """Test DataFlowHook is created and the right args are passed to + start_python_workflow. + + """ + start_python_hook = dataflow_mock.return_value.start_python_dataflow + self.dataflow.execute(None) + assert dataflow_mock.called + expected_options = { + 'project': 'test', + 'staging_location': 'gs://test/staging', + 'output': 'gs://test/output' + } + start_python_hook.assert_called_once_with(TASK_ID, expected_options, + PY_FILE, PY_OPTIONS)