Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0D3A6200C31 for ; Wed, 22 Feb 2017 01:05:01 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0BC29160B68; Wed, 22 Feb 2017 00:05:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 35EFD160B74 for ; Wed, 22 Feb 2017 01:04:59 +0100 (CET) Received: (qmail 58308 invoked by uid 500); 22 Feb 2017 00:04:58 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 58191 invoked by uid 99); 22 Feb 2017 00:04:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Feb 2017 00:04:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 33626DFF0F; Wed, 22 Feb 2017 00:04:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: altay@apache.org To: commits@beam.apache.org Date: Wed, 22 Feb 2017 00:04:59 -0000 Message-Id: <747bfa5462ef429a8116cfe0688160c7@git.apache.org> In-Reply-To: <81c891091af145448d44f4b999a2ce68@git.apache.org> References: <81c891091af145448d44f4b999a2ce68@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] beam git commit: Remove dependency on apitools archived-at: Wed, 22 Feb 2017 00:05:01 -0000 Remove dependency on apitools Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e6b3883a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e6b3883a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e6b3883a Branch: refs/heads/master Commit: e6b3883a2340e3a2fe28b9a5c2d31bbb46c526b8 Parents: 2982238 Author: Sourabh Bajaj Authored: Mon Feb 20 16:58:55 2017 -0800 Committer: Ahmet Altay Committed: Tue Feb 21 16:04:44 2017 -0800 ---------------------------------------------------------------------- .../examples/snippets/snippets_test.py | 9 ++ .../internal/google_cloud_platform/__init__.py | 16 ++ .../google_cloud_platform/json_value.py | 147 +++++++++++++++++++ .../google_cloud_platform/json_value_test.py | 93 ++++++++++++ sdks/python/apache_beam/internal/json_value.py | 141 ------------------ .../apache_beam/internal/json_value_test.py | 84 ----------- sdks/python/apache_beam/io/__init__.py | 11 +- sdks/python/apache_beam/io/fileio.py | 21 ++- .../io/google_cloud_platform/bigquery.py | 9 +- .../io/google_cloud_platform/bigquery_test.py | 19 ++- .../io/google_cloud_platform/gcsio_test.py | 12 +- .../internal/clients/bigquery/__init__.py | 12 +- .../internal/clients/storage/__init__.py | 12 +- sdks/python/apache_beam/runners/__init__.py | 3 +- .../google_cloud_dataflow/dataflow_runner.py | 2 +- .../dataflow_runner_test.py | 9 ++ .../google_cloud_dataflow/internal/apiclient.py | 2 +- .../internal/apiclient_test.py | 12 +- .../internal/clients/dataflow/__init__.py | 12 +- .../clients/dataflow/message_matchers_test.py | 14 +- .../template_runner_test.py | 13 +- .../test_dataflow_runner.py | 40 +++++ sdks/python/apache_beam/runners/runner_test.py | 31 ++-- .../python/apache_beam/runners/test/__init__.py | 8 +- .../runners/test/test_dataflow_runner.py | 40 ----- .../apache_beam/tests/pipeline_verifiers.py | 10 +- .../tests/pipeline_verifiers_test.py | 7 +- sdks/python/apache_beam/utils/retry.py | 13 +- sdks/python/apache_beam/utils/retry_test.py | 14 +- 29 files changed, 500 insertions(+), 316 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 95c959b..f9f1a13 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -37,6 +37,14 @@ from apache_beam.examples.snippets import snippets # pylint: disable=expression-not-assigned from apache_beam.test_pipeline import TestPipeline +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apitools.base.py import base_api +except ImportError: + base_api = None +# pylint: enable=wrong-import-order, wrong-import-position + class ParDoTest(unittest.TestCase): """Tests for model/par-do.""" @@ -576,6 +584,7 @@ class SnippetsTest(unittest.TestCase): # TODO(vikasrk): Expore using Datastore Emulator. snippets.model_datastoreio() + @unittest.skipIf(base_api is None, 'GCP dependencies are not installed') def test_model_bigqueryio(self): # We cannot test BigQueryIO functionality in unit tests therefore we limit # ourselves to making sure the pipeline containing BigQuery sources and http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/internal/google_cloud_platform/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/google_cloud_platform/__init__.py b/sdks/python/apache_beam/internal/google_cloud_platform/__init__.py new file mode 100644 index 0000000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/internal/google_cloud_platform/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/internal/google_cloud_platform/json_value.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/google_cloud_platform/json_value.py b/sdks/python/apache_beam/internal/google_cloud_platform/json_value.py new file mode 100644 index 0000000..c8b5393 --- /dev/null +++ b/sdks/python/apache_beam/internal/google_cloud_platform/json_value.py @@ -0,0 +1,147 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""JSON conversion utility functions.""" + +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apitools.base.py import extra_types +except ImportError: + extra_types = None +# pylint: enable=wrong-import-order, wrong-import-position + + +_MAXINT64 = (1 << 63) - 1 +_MININT64 = - (1 << 63) + + +def get_typed_value_descriptor(obj): + """Converts a basic type into a @type/value dictionary. + + Args: + obj: A basestring, bool, int, or float to be converted. + + Returns: + A dictionary containing the keys '@type' and 'value' with the value for + the @type of appropriate type. + + Raises: + TypeError: if the Python object has a type that is not supported. + """ + if isinstance(obj, basestring): + type_name = 'Text' + elif isinstance(obj, bool): + type_name = 'Boolean' + elif isinstance(obj, int): + type_name = 'Integer' + elif isinstance(obj, float): + type_name = 'Float' + else: + raise TypeError('Cannot get a type descriptor for %s.' % repr(obj)) + return {'@type': 'http://schema.org/%s' % type_name, 'value': obj} + + +def to_json_value(obj, with_type=False): + """Converts Python objects into extra_types.JsonValue objects. + + Args: + obj: Python object to be converted. Can be 'None'. + with_type: If true then the basic types (string, int, float, bool) will + be wrapped in @type/value dictionaries. Otherwise the straight value is + encoded into a JsonValue. + + Returns: + A JsonValue object using JsonValue, JsonArray and JsonObject types for the + corresponding values, lists, or dictionaries. + + Raises: + TypeError: if the Python object contains a type that is not supported. + + The types supported are str, bool, list, tuple, dict, and None. The Dataflow + API requires JsonValue(s) in many places, and it is quite convenient to be + able to specify these hierarchical objects using Python syntax. + """ + if obj is None: + return extra_types.JsonValue(is_null=True) + elif isinstance(obj, (list, tuple)): + return extra_types.JsonValue( + array_value=extra_types.JsonArray( + entries=[to_json_value(o, with_type=with_type) for o in obj])) + elif isinstance(obj, dict): + json_object = extra_types.JsonObject() + for k, v in obj.iteritems(): + json_object.properties.append( + extra_types.JsonObject.Property( + key=k, value=to_json_value(v, with_type=with_type))) + return extra_types.JsonValue(object_value=json_object) + elif with_type: + return to_json_value(get_typed_value_descriptor(obj), with_type=False) + elif isinstance(obj, basestring): + return extra_types.JsonValue(string_value=obj) + elif isinstance(obj, bool): + return extra_types.JsonValue(boolean_value=obj) + elif isinstance(obj, int): + return extra_types.JsonValue(integer_value=obj) + elif isinstance(obj, long): + if _MININT64 <= obj <= _MAXINT64: + return extra_types.JsonValue(integer_value=obj) + else: + raise TypeError('Can not encode {} as a 64-bit integer'.format(obj)) + elif isinstance(obj, float): + return extra_types.JsonValue(double_value=obj) + else: + raise TypeError('Cannot convert %s to a JSON value.' % repr(obj)) + + +def from_json_value(v): + """Converts extra_types.JsonValue objects into Python objects. + + Args: + v: JsonValue object to be converted. + + Returns: + A Python object structured as values, lists, and dictionaries corresponding + to JsonValue, JsonArray and JsonObject types. + + Raises: + TypeError: if the JsonValue object contains a type that is not supported. + + The types supported are str, bool, list, dict, and None. The Dataflow API + returns JsonValue(s) in many places and it is quite convenient to be able to + convert these hierarchical objects to much simpler Python objects. + """ + if isinstance(v, extra_types.JsonValue): + if v.string_value is not None: + return v.string_value + elif v.boolean_value is not None: + return v.boolean_value + elif v.integer_value is not None: + return v.integer_value + elif v.double_value is not None: + return v.double_value + elif v.array_value is not None: + return from_json_value(v.array_value) + elif v.object_value is not None: + return from_json_value(v.object_value) + elif v.is_null: + return None + elif isinstance(v, extra_types.JsonArray): + return [from_json_value(e) for e in v.entries] + elif isinstance(v, extra_types.JsonObject): + return {p.key: from_json_value(p.value) for p in v.properties} + raise TypeError('Cannot convert %s from a JSON value.' % repr(v)) http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/internal/google_cloud_platform/json_value_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/google_cloud_platform/json_value_test.py b/sdks/python/apache_beam/internal/google_cloud_platform/json_value_test.py new file mode 100644 index 0000000..1a83008 --- /dev/null +++ b/sdks/python/apache_beam/internal/google_cloud_platform/json_value_test.py @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the json_value module.""" + +import unittest + +from apache_beam.internal.google_cloud_platform.json_value import from_json_value +from apache_beam.internal.google_cloud_platform.json_value import to_json_value + + +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apitools.base.py.extra_types import JsonValue +except: + JsonValue = None +# pylint: enable=wrong-import-order, wrong-import-position + + +@unittest.skipIf(JsonValue is None, 'GCP dependencies are not installed') +class JsonValueTest(unittest.TestCase): + + def test_string_to(self): + self.assertEquals(JsonValue(string_value='abc'), to_json_value('abc')) + + def test_true_to(self): + self.assertEquals(JsonValue(boolean_value=True), to_json_value(True)) + + def test_false_to(self): + self.assertEquals(JsonValue(boolean_value=False), to_json_value(False)) + + def test_int_to(self): + self.assertEquals(JsonValue(integer_value=14), to_json_value(14)) + + def test_float_to(self): + self.assertEquals(JsonValue(double_value=2.75), to_json_value(2.75)) + + def test_none_to(self): + self.assertEquals(JsonValue(is_null=True), to_json_value(None)) + + def test_string_from(self): + self.assertEquals('WXYZ', from_json_value(to_json_value('WXYZ'))) + + def test_true_from(self): + self.assertEquals(True, from_json_value(to_json_value(True))) + + def test_false_from(self): + self.assertEquals(False, from_json_value(to_json_value(False))) + + def test_int_from(self): + self.assertEquals(-27, from_json_value(to_json_value(-27))) + + def test_float_from(self): + self.assertEquals(4.5, from_json_value(to_json_value(4.5))) + + def test_with_type(self): + rt = from_json_value(to_json_value('abcd', with_type=True)) + self.assertEquals('http://schema.org/Text', rt['@type']) + self.assertEquals('abcd', rt['value']) + + def test_none_from(self): + self.assertIsNone(from_json_value(to_json_value(None))) + + def test_large_integer(self): + num = 1 << 35 + self.assertEquals(num, from_json_value(to_json_value(num))) + self.assertEquals(long(num), from_json_value(to_json_value(long(num)))) + + def test_long_value(self): + self.assertEquals(long(27), from_json_value(to_json_value(long(27)))) + + def test_too_long_value(self): + with self.assertRaises(TypeError): + to_json_value(long(1 << 64)) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/internal/json_value.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/json_value.py b/sdks/python/apache_beam/internal/json_value.py deleted file mode 100644 index 23b5d5b..0000000 --- a/sdks/python/apache_beam/internal/json_value.py +++ /dev/null @@ -1,141 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""JSON conversion utility functions.""" - -from apitools.base.py import extra_types - - -_MAXINT64 = (1 << 63) - 1 -_MININT64 = - (1 << 63) - - -def get_typed_value_descriptor(obj): - """Converts a basic type into a @type/value dictionary. - - Args: - obj: A basestring, bool, int, or float to be converted. - - Returns: - A dictionary containing the keys '@type' and 'value' with the value for - the @type of appropriate type. - - Raises: - TypeError: if the Python object has a type that is not supported. - """ - if isinstance(obj, basestring): - type_name = 'Text' - elif isinstance(obj, bool): - type_name = 'Boolean' - elif isinstance(obj, int): - type_name = 'Integer' - elif isinstance(obj, float): - type_name = 'Float' - else: - raise TypeError('Cannot get a type descriptor for %s.' % repr(obj)) - return {'@type': 'http://schema.org/%s' % type_name, 'value': obj} - - -def to_json_value(obj, with_type=False): - """Converts Python objects into extra_types.JsonValue objects. - - Args: - obj: Python object to be converted. Can be 'None'. - with_type: If true then the basic types (string, int, float, bool) will - be wrapped in @type/value dictionaries. Otherwise the straight value is - encoded into a JsonValue. - - Returns: - A JsonValue object using JsonValue, JsonArray and JsonObject types for the - corresponding values, lists, or dictionaries. - - Raises: - TypeError: if the Python object contains a type that is not supported. - - The types supported are str, bool, list, tuple, dict, and None. The Dataflow - API requires JsonValue(s) in many places, and it is quite convenient to be - able to specify these hierarchical objects using Python syntax. - """ - if obj is None: - return extra_types.JsonValue(is_null=True) - elif isinstance(obj, (list, tuple)): - return extra_types.JsonValue( - array_value=extra_types.JsonArray( - entries=[to_json_value(o, with_type=with_type) for o in obj])) - elif isinstance(obj, dict): - json_object = extra_types.JsonObject() - for k, v in obj.iteritems(): - json_object.properties.append( - extra_types.JsonObject.Property( - key=k, value=to_json_value(v, with_type=with_type))) - return extra_types.JsonValue(object_value=json_object) - elif with_type: - return to_json_value(get_typed_value_descriptor(obj), with_type=False) - elif isinstance(obj, basestring): - return extra_types.JsonValue(string_value=obj) - elif isinstance(obj, bool): - return extra_types.JsonValue(boolean_value=obj) - elif isinstance(obj, int): - return extra_types.JsonValue(integer_value=obj) - elif isinstance(obj, long): - if _MININT64 <= obj <= _MAXINT64: - return extra_types.JsonValue(integer_value=obj) - else: - raise TypeError('Can not encode {} as a 64-bit integer'.format(obj)) - elif isinstance(obj, float): - return extra_types.JsonValue(double_value=obj) - else: - raise TypeError('Cannot convert %s to a JSON value.' % repr(obj)) - - -def from_json_value(v): - """Converts extra_types.JsonValue objects into Python objects. - - Args: - v: JsonValue object to be converted. - - Returns: - A Python object structured as values, lists, and dictionaries corresponding - to JsonValue, JsonArray and JsonObject types. - - Raises: - TypeError: if the JsonValue object contains a type that is not supported. - - The types supported are str, bool, list, dict, and None. The Dataflow API - returns JsonValue(s) in many places and it is quite convenient to be able to - convert these hierarchical objects to much simpler Python objects. - """ - if isinstance(v, extra_types.JsonValue): - if v.string_value is not None: - return v.string_value - elif v.boolean_value is not None: - return v.boolean_value - elif v.integer_value is not None: - return v.integer_value - elif v.double_value is not None: - return v.double_value - elif v.array_value is not None: - return from_json_value(v.array_value) - elif v.object_value is not None: - return from_json_value(v.object_value) - elif v.is_null: - return None - elif isinstance(v, extra_types.JsonArray): - return [from_json_value(e) for e in v.entries] - elif isinstance(v, extra_types.JsonObject): - return {p.key: from_json_value(p.value) for p in v.properties} - raise TypeError('Cannot convert %s from a JSON value.' % repr(v)) http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/internal/json_value_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/json_value_test.py b/sdks/python/apache_beam/internal/json_value_test.py deleted file mode 100644 index a4a47b8..0000000 --- a/sdks/python/apache_beam/internal/json_value_test.py +++ /dev/null @@ -1,84 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for the json_value module.""" - -import unittest - -from apitools.base.py.extra_types import JsonValue -from apache_beam.internal.json_value import from_json_value -from apache_beam.internal.json_value import to_json_value - - -class JsonValueTest(unittest.TestCase): - - def test_string_to(self): - self.assertEquals(JsonValue(string_value='abc'), to_json_value('abc')) - - def test_true_to(self): - self.assertEquals(JsonValue(boolean_value=True), to_json_value(True)) - - def test_false_to(self): - self.assertEquals(JsonValue(boolean_value=False), to_json_value(False)) - - def test_int_to(self): - self.assertEquals(JsonValue(integer_value=14), to_json_value(14)) - - def test_float_to(self): - self.assertEquals(JsonValue(double_value=2.75), to_json_value(2.75)) - - def test_none_to(self): - self.assertEquals(JsonValue(is_null=True), to_json_value(None)) - - def test_string_from(self): - self.assertEquals('WXYZ', from_json_value(to_json_value('WXYZ'))) - - def test_true_from(self): - self.assertEquals(True, from_json_value(to_json_value(True))) - - def test_false_from(self): - self.assertEquals(False, from_json_value(to_json_value(False))) - - def test_int_from(self): - self.assertEquals(-27, from_json_value(to_json_value(-27))) - - def test_float_from(self): - self.assertEquals(4.5, from_json_value(to_json_value(4.5))) - - def test_with_type(self): - rt = from_json_value(to_json_value('abcd', with_type=True)) - self.assertEquals('http://schema.org/Text', rt['@type']) - self.assertEquals('abcd', rt['value']) - - def test_none_from(self): - self.assertIsNone(from_json_value(to_json_value(None))) - - def test_large_integer(self): - num = 1 << 35 - self.assertEquals(num, from_json_value(to_json_value(num))) - self.assertEquals(long(num), from_json_value(to_json_value(long(num)))) - - def test_long_value(self): - self.assertEquals(long(27), from_json_value(to_json_value(long(27)))) - - def test_too_long_value(self): - with self.assertRaises(TypeError): - to_json_value(long(1 << 64)) - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/io/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/__init__.py b/sdks/python/apache_beam/io/__init__.py index 972ed53..af6c56e 100644 --- a/sdks/python/apache_beam/io/__init__.py +++ b/sdks/python/apache_beam/io/__init__.py @@ -27,5 +27,12 @@ from apache_beam.io.iobase import Writer from apache_beam.io.textio import * from apache_beam.io.tfrecordio import * from apache_beam.io.range_trackers import * -from apache_beam.io.google_cloud_platform.bigquery import * -from apache_beam.io.google_cloud_platform.pubsub import * + +# Protect against environments where clientslibrary is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apache_beam.io.google_cloud_platform.bigquery import * + from apache_beam.io.google_cloud_platform.pubsub import * +except ImportError: + pass +# pylint: enable=wrong-import-order, wrong-import-position http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 2fe267f..761644f 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -30,9 +30,22 @@ import zlib from apache_beam.internal import util from apache_beam.io import iobase -from apache_beam.io.google_cloud_platform import gcsio from apache_beam.transforms.display import DisplayDataItem +# TODO(sourabhbajaj): Fix the constant values after the new IO factory +# Current constants are copy pasted from gcsio.py till we fix this. +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apache_beam.io.google_cloud_platform import gcsio + DEFAULT_READ_BUFFER_SIZE = gcsio.DEFAULT_READ_BUFFER_SIZE + MAX_BATCH_OPERATION_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE +except ImportError: + gcsio = None + DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 + MAX_BATCH_OPERATION_SIZE = 100 +# pylint: enable=wrong-import-order, wrong-import-position + DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN' @@ -190,7 +203,7 @@ class ChannelFactory(object): gcs_current_batch = [] for src, dest in src_dest_pairs: gcs_current_batch.append((src, dest)) - if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE: + if len(gcs_current_batch) == MAX_BATCH_OPERATION_SIZE: gcs_batches.append(gcs_current_batch) gcs_current_batch = [] if gcs_current_batch: @@ -324,7 +337,7 @@ class _CompressedFile(object): def __init__(self, fileobj, compression_type=CompressionTypes.GZIP, - read_size=gcsio.DEFAULT_READ_BUFFER_SIZE): + read_size=DEFAULT_READ_BUFFER_SIZE): if not fileobj: raise ValueError('File object must be opened file but was at %s' % fileobj) @@ -633,7 +646,7 @@ class FileSink(iobase.Sink): current_batch = [] for rename_op in rename_ops: current_batch.append(rename_op) - if len(current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE: + if len(current_batch) == MAX_BATCH_OPERATION_SIZE: batches.append(current_batch) current_batch = [] if current_batch: http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/io/google_cloud_platform/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/google_cloud_platform/bigquery.py b/sdks/python/apache_beam/io/google_cloud_platform/bigquery.py index 3beecea..a5d5ab2 100644 --- a/sdks/python/apache_beam/io/google_cloud_platform/bigquery.py +++ b/sdks/python/apache_beam/io/google_cloud_platform/bigquery.py @@ -110,21 +110,20 @@ import re import time import uuid -from apitools.base.py.exceptions import HttpError - from apache_beam import coders from apache_beam.internal import auth -from apache_beam.internal.json_value import from_json_value -from apache_beam.internal.json_value import to_json_value +from apache_beam.internal.google_cloud_platform.json_value import from_json_value +from apache_beam.internal.google_cloud_platform.json_value import to_json_value from apache_beam.runners.google_cloud_dataflow.native_io import iobase as dataflow_io from apache_beam.transforms.display import DisplayDataItem from apache_beam.utils import retry from apache_beam.utils.pipeline_options import GoogleCloudOptions +from apache_beam.io.google_cloud_platform.internal.clients import bigquery # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position try: - from apache_beam.io.google_cloud_platform.internal.clients import bigquery + from apitools.base.py.exceptions import HttpError except ImportError: pass # pylint: enable=wrong-import-order, wrong-import-position http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/io/google_cloud_platform/bigquery_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/google_cloud_platform/bigquery_test.py b/sdks/python/apache_beam/io/google_cloud_platform/bigquery_test.py index 9c08a20..de8e9c4 100644 --- a/sdks/python/apache_beam/io/google_cloud_platform/bigquery_test.py +++ b/sdks/python/apache_beam/io/google_cloud_platform/bigquery_test.py @@ -25,19 +25,27 @@ import unittest import hamcrest as hc import mock -from apitools.base.py.exceptions import HttpError import apache_beam as beam -from apache_beam.internal.json_value import to_json_value from apache_beam.io.google_cloud_platform.bigquery import RowAsDictJsonCoder from apache_beam.io.google_cloud_platform.bigquery import TableRowJsonCoder from apache_beam.io.google_cloud_platform.bigquery import parse_table_schema_from_json from apache_beam.io.google_cloud_platform.internal.clients import bigquery +from apache_beam.internal.google_cloud_platform.json_value import to_json_value from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.utils.pipeline_options import PipelineOptions +# Protect against environments where bigquery library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apitools.base.py.exceptions import HttpError +except ImportError: + HttpError = None +# pylint: enable=wrong-import-order, wrong-import-position + +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestRowAsDictJsonCoder(unittest.TestCase): def test_row_as_dict(self): @@ -62,6 +70,7 @@ class TestRowAsDictJsonCoder(unittest.TestCase): self.json_compliance_exception(float('-inf')) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestTableRowJsonCoder(unittest.TestCase): def test_row_as_table_row(self): @@ -123,6 +132,7 @@ class TestTableRowJsonCoder(unittest.TestCase): self.json_compliance_exception(float('-inf')) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestTableSchemaParser(unittest.TestCase): def test_parse_table_schema_from_json(self): string_field = bigquery.TableFieldSchema( @@ -144,6 +154,7 @@ class TestTableSchemaParser(unittest.TestCase): expected_schema) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestBigQuerySource(unittest.TestCase): def test_display_data_item_on_validate_true(self): @@ -234,6 +245,7 @@ class TestBigQuerySource(unittest.TestCase): hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestBigQuerySink(unittest.TestCase): def test_table_spec_display_data(self): @@ -291,6 +303,7 @@ class TestBigQuerySink(unittest.TestCase): json.loads(sink.schema_as_json())) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestBigQueryReader(unittest.TestCase): def get_test_rows(self): @@ -543,6 +556,7 @@ class TestBigQueryReader(unittest.TestCase): reader.query) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestBigQueryWriter(unittest.TestCase): @mock.patch('time.sleep', return_value=None) @@ -713,6 +727,7 @@ class TestBigQueryWriter(unittest.TestCase): self.assertEquals('myproject', writer.project_id) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestBigQueryWrapper(unittest.TestCase): def test_delete_non_existing_dataset(self): http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/io/google_cloud_platform/gcsio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/google_cloud_platform/gcsio_test.py b/sdks/python/apache_beam/io/google_cloud_platform/gcsio_test.py index b126626..3a66e9a 100644 --- a/sdks/python/apache_beam/io/google_cloud_platform/gcsio_test.py +++ b/sdks/python/apache_beam/io/google_cloud_platform/gcsio_test.py @@ -27,11 +27,18 @@ import unittest import httplib2 import mock -from apitools.base.py.exceptions import HttpError from apache_beam.io import gcsio from apache_beam.io.google_cloud_platform.internal.clients import storage +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apitools.base.py.exceptions import HttpError +except ImportError: + HttpError = None +# pylint: enable=wrong-import-order, wrong-import-position + class FakeGcsClient(object): # Fake storage client. Usage in gcsio.py is client.objects.Get(...) and @@ -196,6 +203,7 @@ class FakeBatchApiRequest(object): return api_calls +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestGCSPathParser(unittest.TestCase): def test_gcs_path(self): @@ -213,6 +221,7 @@ class TestGCSPathParser(unittest.TestCase): self.assertRaises(ValueError, gcsio.parse_gcs_path, 'gs:/blah/bucket/name') +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestGCSIO(unittest.TestCase): def _insert_random_file(self, client, path, size, generation=1): @@ -740,6 +749,7 @@ class TestGCSIO(unittest.TestCase): self.gcs.size_of_files_in_glob(file_pattern), expected_file_sizes) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestPipeStream(unittest.TestCase): def _read_and_verify(self, stream, expected, buffer_size): http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/bigquery/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/bigquery/__init__.py b/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/bigquery/__init__.py index 673e4d2..2b78ef1 100644 --- a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/bigquery/__init__.py +++ b/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/bigquery/__init__.py @@ -20,8 +20,14 @@ import pkgutil -from apitools.base.py import * -from apache_beam.io.google_cloud_platform.internal.clients.bigquery.bigquery_v2_client import * -from apache_beam.io.google_cloud_platform.internal.clients.bigquery.bigquery_v2_messages import * +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apitools.base.py import * + from apache_beam.io.google_cloud_platform.internal.clients.bigquery.bigquery_v2_client import * + from apache_beam.io.google_cloud_platform.internal.clients.bigquery.bigquery_v2_messages import * +except ImportError: + pass +# pylint: enable=wrong-import-order, wrong-import-position __path__ = pkgutil.extend_path(__path__, __name__) http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/storage/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/storage/__init__.py b/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/storage/__init__.py index 81eee3e..fd1976b 100644 --- a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/storage/__init__.py +++ b/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/storage/__init__.py @@ -20,8 +20,14 @@ import pkgutil -from apitools.base.py import * -from apache_beam.io.google_cloud_platform.internal.clients.storage.storage_v1_client import * -from apache_beam.io.google_cloud_platform.internal.clients.storage.storage_v1_messages import * +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apitools.base.py import * + from apache_beam.io.google_cloud_platform.internal.clients.storage.storage_v1_client import * + from apache_beam.io.google_cloud_platform.internal.clients.storage.storage_v1_messages import * +except ImportError: + pass +# pylint: enable=wrong-import-order, wrong-import-position __path__ = pkgutil.extend_path(__path__, __name__) http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/__init__.py b/sdks/python/apache_beam/runners/__init__.py index f56b2ec..8464992 100644 --- a/sdks/python/apache_beam/runners/__init__.py +++ b/sdks/python/apache_beam/runners/__init__.py @@ -22,8 +22,9 @@ This package defines runners, which are used to execute a pipeline. from apache_beam.runners.direct.direct_runner import DirectRunner from apache_beam.runners.direct.direct_runner import EagerRunner -from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState from apache_beam.runners.runner import create_runner + +from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner from apache_beam.runners.test.test_dataflow_runner import TestDataflowRunner http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py index 760935d..b52bd8b 100644 --- a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py @@ -29,8 +29,8 @@ import traceback from apache_beam import coders from apache_beam import pvalue -from apache_beam.internal import json_value from apache_beam.internal import pickler +from apache_beam.internal.google_cloud_platform import json_value from apache_beam.pvalue import PCollectionView from apache_beam.runners.google_cloud_dataflow.dataflow_metrics import DataflowMetrics from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow as dataflow_api http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py index 59bd2fc..ee4ec8f 100644 --- a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py @@ -25,6 +25,14 @@ from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowPi from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRuntimeException from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow as dataflow_api +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apitools.base.py import base_api +except ImportError: + base_api = None +# pylint: enable=wrong-import-order, wrong-import-position + class DataflowRunnerTest(unittest.TestCase): @@ -33,6 +41,7 @@ class DataflowRunnerTest(unittest.TestCase): self.assertTrue(df_result.metrics()) self.assertTrue(df_result.metrics().query()) + @unittest.skipIf(base_api is None, 'GCP dependencies are not installed') @mock.patch('time.sleep', return_value=None) def test_wait_until_finish(self, patched_time_sleep): values_enum = dataflow_api.Job.CurrentStateValueValuesEnum http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py index 77a708a..98473ca 100644 --- a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py @@ -32,7 +32,7 @@ from apitools.base.py import exceptions from apache_beam import utils from apache_beam.internal.auth import get_service_credentials -from apache_beam.internal.json_value import to_json_value +from apache_beam.internal.google_cloud_platform.json_value import to_json_value from apache_beam.io.google_cloud_platform.internal.clients import storage from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow from apache_beam.transforms import cy_combiners http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py index bc57211..7adcf8b 100644 --- a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py @@ -21,12 +21,20 @@ import unittest from mock import Mock from apache_beam.metrics.cells import DistributionData +from apache_beam.utils.pipeline_options import PipelineOptions + from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner -from apache_beam.runners.google_cloud_dataflow.internal import apiclient from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow -from apache_beam.utils.pipeline_options import PipelineOptions +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apache_beam.runners.google_cloud_dataflow.internal import apiclient +except ImportError: + apiclient = None +# pylint: enable=wrong-import-order, wrong-import-position +@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed') class UtilTest(unittest.TestCase): @unittest.skip("Enable once BEAM-1080 is fixed.") http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py index eedf141..1399f21 100644 --- a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py @@ -20,8 +20,14 @@ import pkgutil -from apitools.base.py import * -from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_messages import * -from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_client import * +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apitools.base.py import * + from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_messages import * + from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_client import * +except ImportError: + pass +# pylint: enable=wrong-import-order, wrong-import-position __path__ = pkgutil.extend_path(__path__, __name__) http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/message_matchers_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/message_matchers_test.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/message_matchers_test.py index 2b56ae1..889a66d 100644 --- a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/message_matchers_test.py +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/message_matchers_test.py @@ -15,14 +15,22 @@ # limitations under the License. # import unittest - import hamcrest as hc - import apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow as dataflow -from apache_beam.internal.json_value import to_json_value + +from apache_beam.internal.google_cloud_platform.json_value import to_json_value from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow import message_matchers +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apitools.base.py import base_api +except ImportError: + base_api = None +# pylint: enable=wrong-import-order, wrong-import-position + +@unittest.skipIf(base_api is None, 'GCP dependencies are not installed') class TestMatchers(unittest.TestCase): def test_structured_name_matcher_basic(self): http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/template_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/template_runner_test.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/template_runner_test.py index bbcf340..6f6bfe7 100644 --- a/sdks/python/apache_beam/runners/google_cloud_dataflow/template_runner_test.py +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/template_runner_test.py @@ -25,13 +25,22 @@ import unittest import apache_beam as beam from apache_beam.pipeline import Pipeline -from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner -from apache_beam.runners.google_cloud_dataflow.internal import apiclient from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apache_beam.runners.google_cloud_dataflow.internal import apiclient +except ImportError: + apiclient = None +# pylint: enable=wrong-import-order, wrong-import-position + +@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed') class TemplatingDataflowRunnerTest(unittest.TestCase): """TemplatingDataflow tests.""" + def test_full_completion(self): # Create dummy file and close it. Note that we need to do this because # Windows does not allow NamedTemporaryFiles to be reopened elsewhere http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/test_dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/test_dataflow_runner.py new file mode 100644 index 0000000..323d49b --- /dev/null +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/test_dataflow_runner.py @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Wrapper of Beam runners that's built for running and verifying e2e tests.""" + +from apache_beam.internal import pickler +from apache_beam.utils.pipeline_options import TestOptions +from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner + + +class TestDataflowRunner(DataflowRunner): + + def __init__(self): + super(TestDataflowRunner, self).__init__() + + def run(self, pipeline): + """Execute test pipeline and verify test matcher""" + self.result = super(TestDataflowRunner, self).run(pipeline) + self.result.wait_until_finish() + + options = pipeline.options.view_as(TestOptions) + if options.on_success_matcher: + from hamcrest import assert_that as hc_assert_that + hc_assert_that(self.result, pickler.loads(options.on_success_matcher)) + + return self.result http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index 23f8b22..a715374 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -36,16 +36,24 @@ from apache_beam.metrics.execution import MetricKey from apache_beam.metrics.execution import MetricResult from apache_beam.metrics.metricbase import MetricName from apache_beam.pipeline import Pipeline -from apache_beam.runners import DataflowRunner from apache_beam.runners import DirectRunner -from apache_beam.runners import TestDataflowRunner from apache_beam.runners import create_runner -from apache_beam.runners.google_cloud_dataflow.internal import apiclient from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.runners import DataflowRunner +from apache_beam.runners import TestDataflowRunner + +# Protect against environments where api client is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apache_beam.runners.google_cloud_dataflow.internal import apiclient +except ImportError: + apiclient = None +# pylint: enable=wrong-import-order, wrong-import-position + class RunnerTest(unittest.TestCase): default_properties = [ @@ -59,12 +67,14 @@ class RunnerTest(unittest.TestCase): def test_create_runner(self): self.assertTrue( isinstance(create_runner('DirectRunner'), DirectRunner)) - self.assertTrue( - isinstance(create_runner('DataflowRunner'), - DataflowRunner)) - self.assertTrue( - isinstance(create_runner('TestDataflowRunner'), - TestDataflowRunner)) + if apiclient is not None: + self.assertTrue( + isinstance(create_runner('DataflowRunner'), + DataflowRunner)) + if apiclient is not None: + self.assertTrue( + isinstance(create_runner('TestDataflowRunner'), + TestDataflowRunner)) self.assertRaises(ValueError, create_runner, 'xyz') def test_create_runner_shorthand(self): @@ -79,6 +89,7 @@ class RunnerTest(unittest.TestCase): self.assertTrue( isinstance(create_runner('Direct'), DirectRunner)) + @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed') def test_remote_runner_translation(self): remote_runner = DataflowRunner() p = Pipeline(remote_runner, @@ -90,6 +101,7 @@ class RunnerTest(unittest.TestCase): remote_runner.job = apiclient.Job(p.options) super(DataflowRunner, remote_runner).run(p) + @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed') def test_remote_runner_display_data(self): remote_runner = DataflowRunner() p = Pipeline(remote_runner, @@ -194,6 +206,7 @@ class RunnerTest(unittest.TestCase): DistributionResult(DistributionData(15, 5, 1, 5)), DistributionResult(DistributionData(15, 5, 1, 5))))) + @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed') def test_no_group_by_key_directly_after_bigquery(self): remote_runner = DataflowRunner() p = Pipeline(remote_runner, http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/test/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/test/__init__.py b/sdks/python/apache_beam/runners/test/__init__.py index 6fa483b..bdc152f 100644 --- a/sdks/python/apache_beam/runners/test/__init__.py +++ b/sdks/python/apache_beam/runners/test/__init__.py @@ -21,4 +21,10 @@ This package defines runners, which are used to execute test pipeline and verify results. """ -from apache_beam.runners.test.test_dataflow_runner import TestDataflowRunner +# Protect against environments where dataflow runner is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apache_beam.runners.google_cloud_dataflow.test_dataflow_runner import TestDataflowRunner +except ImportError: + pass +# pylint: enable=wrong-import-order, wrong-import-position http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py deleted file mode 100644 index ef1a8b6..0000000 --- a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py +++ /dev/null @@ -1,40 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Wrapper of Beam runners that's built for running and verifying e2e tests.""" - -from apache_beam.internal import pickler -from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner -from apache_beam.utils.pipeline_options import TestOptions - - -class TestDataflowRunner(DataflowRunner): - - def __init__(self): - super(TestDataflowRunner, self).__init__() - - def run(self, pipeline): - """Execute test pipeline and verify test matcher""" - self.result = super(TestDataflowRunner, self).run(pipeline) - self.result.wait_until_finish() - - options = pipeline.options.view_as(TestOptions) - if options.on_success_matcher: - from hamcrest import assert_that as hc_assert_that - hc_assert_that(self.result, pickler.loads(options.on_success_matcher)) - - return self.result http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/tests/pipeline_verifiers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py index 9b286a2..41dfc07 100644 --- a/sdks/python/apache_beam/tests/pipeline_verifiers.py +++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py @@ -25,13 +25,17 @@ of test pipeline job. Customized verifier should extend import hashlib import logging -from apitools.base.py.exceptions import HttpError from hamcrest.core.base_matcher import BaseMatcher from apache_beam.io.fileio import ChannelFactory from apache_beam.runners.runner import PipelineState from apache_beam.utils import retry +try: + from apitools.base.py.exceptions import HttpError +except ImportError: + HttpError = None + MAX_RETRIES = 4 @@ -61,8 +65,8 @@ class PipelineStateMatcher(BaseMatcher): def retry_on_io_error_and_server_error(exception): """Filter allowing retries on file I/O errors and service error.""" - if isinstance(exception, HttpError) or \ - isinstance(exception, IOError): + if isinstance(exception, IOError) or \ + (HttpError is not None and isinstance(exception, HttpError)): return True else: return False http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py index 84a4dbe..1801624 100644 --- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py +++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py @@ -21,7 +21,6 @@ import logging import tempfile import unittest -from apitools.base.py.exceptions import HttpError from hamcrest import assert_that as hc_assert_that from mock import Mock, patch @@ -31,6 +30,11 @@ from apache_beam.runners.runner import PipelineResult from apache_beam.tests import pipeline_verifiers as verifiers from apache_beam.tests.test_utils import patch_retry +try: + from apitools.base.py.exceptions import HttpError +except: + HttpError = None + class PipelineVerifiersTest(unittest.TestCase): @@ -102,6 +106,7 @@ class PipelineVerifiersTest(unittest.TestCase): self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count) @patch.object(ChannelFactory, 'glob') + @unittest.skipIf(HttpError is None, 'google-apitools is not installed') def test_file_checksum_matcher_service_error(self, mock_glob): mock_glob.side_effect = HttpError( response={'status': '404'}, url='', content='Not Found', http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/utils/retry.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 648d726..05973c5 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -29,7 +29,14 @@ import sys import time import traceback -from apitools.base.py.exceptions import HttpError +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +# TODO(sourabhbajaj): Remove the GCP specific error code to a submodule +try: + from apitools.base.py.exceptions import HttpError +except ImportError: + HttpError = None +# pylint: enable=wrong-import-order, wrong-import-position class PermanentException(Exception): @@ -78,7 +85,7 @@ class FuzzedExponentialIntervals(object): def retry_on_server_errors_filter(exception): """Filter allowing retries on server errors and non-HttpErrors.""" - if isinstance(exception, HttpError): + if (HttpError is not None) and isinstance(exception, HttpError): if exception.status_code >= 500: return True else: @@ -92,7 +99,7 @@ def retry_on_server_errors_filter(exception): def retry_on_server_errors_and_timeout_filter(exception): - if isinstance(exception, HttpError): + if HttpError is not None and isinstance(exception, HttpError): if exception.status_code == 408: # 408 Request Timeout return True return retry_on_server_errors_filter(exception) http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/utils/retry_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/retry_test.py b/sdks/python/apache_beam/utils/retry_test.py index dd1741c..1b03c83 100644 --- a/sdks/python/apache_beam/utils/retry_test.py +++ b/sdks/python/apache_beam/utils/retry_test.py @@ -19,7 +19,15 @@ import unittest -from apitools.base.py.exceptions import HttpError +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +# TODO(sourabhbajaj): Remove the GCP specific error code to a submodule +try: + from apitools.base.py.exceptions import HttpError +except ImportError: + HttpError = None +# pylint: enable=wrong-import-order, wrong-import-position + from apache_beam.utils import retry @@ -80,6 +88,8 @@ class RetryTest(unittest.TestCase): raise NotImplementedError def http_error(self, code): + if HttpError is None: + raise RuntimeError("This is not a valid test as GCP is not enabled") raise HttpError({'status': str(code)}, '', '') def test_with_explicit_decorator(self): @@ -109,6 +119,7 @@ class RetryTest(unittest.TestCase): 10, b=20) self.assertEqual(len(self.clock.calls), 10) + @unittest.skipIf(HttpError is None, 'google-apitools is not installed') def test_with_http_error_that_should_not_be_retried(self): self.assertRaises(HttpError, retry.with_exponential_backoff( @@ -118,6 +129,7 @@ class RetryTest(unittest.TestCase): # Make sure just one call was made. self.assertEqual(len(self.clock.calls), 0) + @unittest.skipIf(HttpError is None, 'google-apitools is not installed') def test_with_http_error_that_should_be_retried(self): self.assertRaises(HttpError, retry.with_exponential_backoff(