beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [2/3] beam git commit: Remove dependency on apitools
Date Wed, 22 Feb 2017 00:04:59 GMT
Remove dependency on apitools


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e6b3883a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e6b3883a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e6b3883a

Branch: refs/heads/master
Commit: e6b3883a2340e3a2fe28b9a5c2d31bbb46c526b8
Parents: 2982238
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Mon Feb 20 16:58:55 2017 -0800
Committer: Ahmet Altay <altay@google.com>
Committed: Tue Feb 21 16:04:44 2017 -0800

----------------------------------------------------------------------
 .../examples/snippets/snippets_test.py          |   9 ++
 .../internal/google_cloud_platform/__init__.py  |  16 ++
 .../google_cloud_platform/json_value.py         | 147 +++++++++++++++++++
 .../google_cloud_platform/json_value_test.py    |  93 ++++++++++++
 sdks/python/apache_beam/internal/json_value.py  | 141 ------------------
 .../apache_beam/internal/json_value_test.py     |  84 -----------
 sdks/python/apache_beam/io/__init__.py          |  11 +-
 sdks/python/apache_beam/io/fileio.py            |  21 ++-
 .../io/google_cloud_platform/bigquery.py        |   9 +-
 .../io/google_cloud_platform/bigquery_test.py   |  19 ++-
 .../io/google_cloud_platform/gcsio_test.py      |  12 +-
 .../internal/clients/bigquery/__init__.py       |  12 +-
 .../internal/clients/storage/__init__.py        |  12 +-
 sdks/python/apache_beam/runners/__init__.py     |   3 +-
 .../google_cloud_dataflow/dataflow_runner.py    |   2 +-
 .../dataflow_runner_test.py                     |   9 ++
 .../google_cloud_dataflow/internal/apiclient.py |   2 +-
 .../internal/apiclient_test.py                  |  12 +-
 .../internal/clients/dataflow/__init__.py       |  12 +-
 .../clients/dataflow/message_matchers_test.py   |  14 +-
 .../template_runner_test.py                     |  13 +-
 .../test_dataflow_runner.py                     |  40 +++++
 sdks/python/apache_beam/runners/runner_test.py  |  31 ++--
 .../python/apache_beam/runners/test/__init__.py |   8 +-
 .../runners/test/test_dataflow_runner.py        |  40 -----
 .../apache_beam/tests/pipeline_verifiers.py     |  10 +-
 .../tests/pipeline_verifiers_test.py            |   7 +-
 sdks/python/apache_beam/utils/retry.py          |  13 +-
 sdks/python/apache_beam/utils/retry_test.py     |  14 +-
 29 files changed, 500 insertions(+), 316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 95c959b..f9f1a13 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -37,6 +37,14 @@ from apache_beam.examples.snippets import snippets
 # pylint: disable=expression-not-assigned
 from apache_beam.test_pipeline import TestPipeline
 
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py import base_api
+except ImportError:
+  base_api = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
 
 class ParDoTest(unittest.TestCase):
   """Tests for model/par-do."""
@@ -576,6 +584,7 @@ class SnippetsTest(unittest.TestCase):
     # TODO(vikasrk): Expore using Datastore Emulator.
     snippets.model_datastoreio()
 
+  @unittest.skipIf(base_api is None, 'GCP dependencies are not installed')
   def test_model_bigqueryio(self):
     # We cannot test BigQueryIO functionality in unit tests therefore we limit
     # ourselves to making sure the pipeline containing BigQuery sources and

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/internal/google_cloud_platform/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/google_cloud_platform/__init__.py b/sdks/python/apache_beam/internal/google_cloud_platform/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/internal/google_cloud_platform/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/internal/google_cloud_platform/json_value.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/google_cloud_platform/json_value.py b/sdks/python/apache_beam/internal/google_cloud_platform/json_value.py
new file mode 100644
index 0000000..c8b5393
--- /dev/null
+++ b/sdks/python/apache_beam/internal/google_cloud_platform/json_value.py
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""JSON conversion utility functions."""
+
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py import extra_types
+except ImportError:
+  extra_types = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+_MAXINT64 = (1 << 63) - 1
+_MININT64 = - (1 << 63)
+
+
+def get_typed_value_descriptor(obj):
+  """Converts a basic type into a @type/value dictionary.
+
+  Args:
+    obj: A basestring, bool, int, or float to be converted.
+
+  Returns:
+    A dictionary containing the keys '@type' and 'value' with the value for
+    the @type of appropriate type.
+
+  Raises:
+    TypeError: if the Python object has a type that is not supported.
+  """
+  if isinstance(obj, basestring):
+    type_name = 'Text'
+  elif isinstance(obj, bool):
+    type_name = 'Boolean'
+  elif isinstance(obj, int):
+    type_name = 'Integer'
+  elif isinstance(obj, float):
+    type_name = 'Float'
+  else:
+    raise TypeError('Cannot get a type descriptor for %s.' % repr(obj))
+  return {'@type': 'http://schema.org/%s' % type_name, 'value': obj}
+
+
+def to_json_value(obj, with_type=False):
+  """Converts Python objects into extra_types.JsonValue objects.
+
+  Args:
+    obj: Python object to be converted. Can be 'None'.
+    with_type: If true then the basic types (string, int, float, bool) will
+      be wrapped in @type/value dictionaries. Otherwise the straight value is
+      encoded into a JsonValue.
+
+  Returns:
+    A JsonValue object using JsonValue, JsonArray and JsonObject types for the
+    corresponding values, lists, or dictionaries.
+
+  Raises:
+    TypeError: if the Python object contains a type that is not supported.
+
+  The types supported are str, bool, list, tuple, dict, and None. The Dataflow
+  API requires JsonValue(s) in many places, and it is quite convenient to be
+  able to specify these hierarchical objects using Python syntax.
+  """
+  if obj is None:
+    return extra_types.JsonValue(is_null=True)
+  elif isinstance(obj, (list, tuple)):
+    return extra_types.JsonValue(
+        array_value=extra_types.JsonArray(
+            entries=[to_json_value(o, with_type=with_type) for o in obj]))
+  elif isinstance(obj, dict):
+    json_object = extra_types.JsonObject()
+    for k, v in obj.iteritems():
+      json_object.properties.append(
+          extra_types.JsonObject.Property(
+              key=k, value=to_json_value(v, with_type=with_type)))
+    return extra_types.JsonValue(object_value=json_object)
+  elif with_type:
+    return to_json_value(get_typed_value_descriptor(obj), with_type=False)
+  elif isinstance(obj, basestring):
+    return extra_types.JsonValue(string_value=obj)
+  elif isinstance(obj, bool):
+    return extra_types.JsonValue(boolean_value=obj)
+  elif isinstance(obj, int):
+    return extra_types.JsonValue(integer_value=obj)
+  elif isinstance(obj, long):
+    if _MININT64 <= obj <= _MAXINT64:
+      return extra_types.JsonValue(integer_value=obj)
+    else:
+      raise TypeError('Can not encode {} as a 64-bit integer'.format(obj))
+  elif isinstance(obj, float):
+    return extra_types.JsonValue(double_value=obj)
+  else:
+    raise TypeError('Cannot convert %s to a JSON value.' % repr(obj))
+
+
+def from_json_value(v):
+  """Converts extra_types.JsonValue objects into Python objects.
+
+  Args:
+    v: JsonValue object to be converted.
+
+  Returns:
+    A Python object structured as values, lists, and dictionaries corresponding
+    to JsonValue, JsonArray and JsonObject types.
+
+  Raises:
+    TypeError: if the JsonValue object contains a type that is not supported.
+
+  The types supported are str, bool, list, dict, and None. The Dataflow API
+  returns JsonValue(s) in many places and it is quite convenient to be able to
+  convert these hierarchical objects to much simpler Python objects.
+  """
+  if isinstance(v, extra_types.JsonValue):
+    if v.string_value is not None:
+      return v.string_value
+    elif v.boolean_value is not None:
+      return v.boolean_value
+    elif v.integer_value is not None:
+      return v.integer_value
+    elif v.double_value is not None:
+      return v.double_value
+    elif v.array_value is not None:
+      return from_json_value(v.array_value)
+    elif v.object_value is not None:
+      return from_json_value(v.object_value)
+    elif v.is_null:
+      return None
+  elif isinstance(v, extra_types.JsonArray):
+    return [from_json_value(e) for e in v.entries]
+  elif isinstance(v, extra_types.JsonObject):
+    return {p.key: from_json_value(p.value) for p in v.properties}
+  raise TypeError('Cannot convert %s from a JSON value.' % repr(v))

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/internal/google_cloud_platform/json_value_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/google_cloud_platform/json_value_test.py b/sdks/python/apache_beam/internal/google_cloud_platform/json_value_test.py
new file mode 100644
index 0000000..1a83008
--- /dev/null
+++ b/sdks/python/apache_beam/internal/google_cloud_platform/json_value_test.py
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the json_value module."""
+
+import unittest
+
+from apache_beam.internal.google_cloud_platform.json_value import from_json_value
+from apache_beam.internal.google_cloud_platform.json_value import to_json_value
+
+
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py.extra_types import JsonValue
+except:
+  JsonValue = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+@unittest.skipIf(JsonValue is None, 'GCP dependencies are not installed')
+class JsonValueTest(unittest.TestCase):
+
+  def test_string_to(self):
+    self.assertEquals(JsonValue(string_value='abc'), to_json_value('abc'))
+
+  def test_true_to(self):
+    self.assertEquals(JsonValue(boolean_value=True), to_json_value(True))
+
+  def test_false_to(self):
+    self.assertEquals(JsonValue(boolean_value=False), to_json_value(False))
+
+  def test_int_to(self):
+    self.assertEquals(JsonValue(integer_value=14), to_json_value(14))
+
+  def test_float_to(self):
+    self.assertEquals(JsonValue(double_value=2.75), to_json_value(2.75))
+
+  def test_none_to(self):
+    self.assertEquals(JsonValue(is_null=True), to_json_value(None))
+
+  def test_string_from(self):
+    self.assertEquals('WXYZ', from_json_value(to_json_value('WXYZ')))
+
+  def test_true_from(self):
+    self.assertEquals(True, from_json_value(to_json_value(True)))
+
+  def test_false_from(self):
+    self.assertEquals(False, from_json_value(to_json_value(False)))
+
+  def test_int_from(self):
+    self.assertEquals(-27, from_json_value(to_json_value(-27)))
+
+  def test_float_from(self):
+    self.assertEquals(4.5, from_json_value(to_json_value(4.5)))
+
+  def test_with_type(self):
+    rt = from_json_value(to_json_value('abcd', with_type=True))
+    self.assertEquals('http://schema.org/Text', rt['@type'])
+    self.assertEquals('abcd', rt['value'])
+
+  def test_none_from(self):
+    self.assertIsNone(from_json_value(to_json_value(None)))
+
+  def test_large_integer(self):
+    num = 1 << 35
+    self.assertEquals(num, from_json_value(to_json_value(num)))
+    self.assertEquals(long(num), from_json_value(to_json_value(long(num))))
+
+  def test_long_value(self):
+    self.assertEquals(long(27), from_json_value(to_json_value(long(27))))
+
+  def test_too_long_value(self):
+    with self.assertRaises(TypeError):
+      to_json_value(long(1 << 64))
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/internal/json_value.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/json_value.py b/sdks/python/apache_beam/internal/json_value.py
deleted file mode 100644
index 23b5d5b..0000000
--- a/sdks/python/apache_beam/internal/json_value.py
+++ /dev/null
@@ -1,141 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""JSON conversion utility functions."""
-
-from apitools.base.py import extra_types
-
-
-_MAXINT64 = (1 << 63) - 1
-_MININT64 = - (1 << 63)
-
-
-def get_typed_value_descriptor(obj):
-  """Converts a basic type into a @type/value dictionary.
-
-  Args:
-    obj: A basestring, bool, int, or float to be converted.
-
-  Returns:
-    A dictionary containing the keys '@type' and 'value' with the value for
-    the @type of appropriate type.
-
-  Raises:
-    TypeError: if the Python object has a type that is not supported.
-  """
-  if isinstance(obj, basestring):
-    type_name = 'Text'
-  elif isinstance(obj, bool):
-    type_name = 'Boolean'
-  elif isinstance(obj, int):
-    type_name = 'Integer'
-  elif isinstance(obj, float):
-    type_name = 'Float'
-  else:
-    raise TypeError('Cannot get a type descriptor for %s.' % repr(obj))
-  return {'@type': 'http://schema.org/%s' % type_name, 'value': obj}
-
-
-def to_json_value(obj, with_type=False):
-  """Converts Python objects into extra_types.JsonValue objects.
-
-  Args:
-    obj: Python object to be converted. Can be 'None'.
-    with_type: If true then the basic types (string, int, float, bool) will
-      be wrapped in @type/value dictionaries. Otherwise the straight value is
-      encoded into a JsonValue.
-
-  Returns:
-    A JsonValue object using JsonValue, JsonArray and JsonObject types for the
-    corresponding values, lists, or dictionaries.
-
-  Raises:
-    TypeError: if the Python object contains a type that is not supported.
-
-  The types supported are str, bool, list, tuple, dict, and None. The Dataflow
-  API requires JsonValue(s) in many places, and it is quite convenient to be
-  able to specify these hierarchical objects using Python syntax.
-  """
-  if obj is None:
-    return extra_types.JsonValue(is_null=True)
-  elif isinstance(obj, (list, tuple)):
-    return extra_types.JsonValue(
-        array_value=extra_types.JsonArray(
-            entries=[to_json_value(o, with_type=with_type) for o in obj]))
-  elif isinstance(obj, dict):
-    json_object = extra_types.JsonObject()
-    for k, v in obj.iteritems():
-      json_object.properties.append(
-          extra_types.JsonObject.Property(
-              key=k, value=to_json_value(v, with_type=with_type)))
-    return extra_types.JsonValue(object_value=json_object)
-  elif with_type:
-    return to_json_value(get_typed_value_descriptor(obj), with_type=False)
-  elif isinstance(obj, basestring):
-    return extra_types.JsonValue(string_value=obj)
-  elif isinstance(obj, bool):
-    return extra_types.JsonValue(boolean_value=obj)
-  elif isinstance(obj, int):
-    return extra_types.JsonValue(integer_value=obj)
-  elif isinstance(obj, long):
-    if _MININT64 <= obj <= _MAXINT64:
-      return extra_types.JsonValue(integer_value=obj)
-    else:
-      raise TypeError('Can not encode {} as a 64-bit integer'.format(obj))
-  elif isinstance(obj, float):
-    return extra_types.JsonValue(double_value=obj)
-  else:
-    raise TypeError('Cannot convert %s to a JSON value.' % repr(obj))
-
-
-def from_json_value(v):
-  """Converts extra_types.JsonValue objects into Python objects.
-
-  Args:
-    v: JsonValue object to be converted.
-
-  Returns:
-    A Python object structured as values, lists, and dictionaries corresponding
-    to JsonValue, JsonArray and JsonObject types.
-
-  Raises:
-    TypeError: if the JsonValue object contains a type that is not supported.
-
-  The types supported are str, bool, list, dict, and None. The Dataflow API
-  returns JsonValue(s) in many places and it is quite convenient to be able to
-  convert these hierarchical objects to much simpler Python objects.
-  """
-  if isinstance(v, extra_types.JsonValue):
-    if v.string_value is not None:
-      return v.string_value
-    elif v.boolean_value is not None:
-      return v.boolean_value
-    elif v.integer_value is not None:
-      return v.integer_value
-    elif v.double_value is not None:
-      return v.double_value
-    elif v.array_value is not None:
-      return from_json_value(v.array_value)
-    elif v.object_value is not None:
-      return from_json_value(v.object_value)
-    elif v.is_null:
-      return None
-  elif isinstance(v, extra_types.JsonArray):
-    return [from_json_value(e) for e in v.entries]
-  elif isinstance(v, extra_types.JsonObject):
-    return {p.key: from_json_value(p.value) for p in v.properties}
-  raise TypeError('Cannot convert %s from a JSON value.' % repr(v))

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/internal/json_value_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/json_value_test.py b/sdks/python/apache_beam/internal/json_value_test.py
deleted file mode 100644
index a4a47b8..0000000
--- a/sdks/python/apache_beam/internal/json_value_test.py
+++ /dev/null
@@ -1,84 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the json_value module."""
-
-import unittest
-
-from apitools.base.py.extra_types import JsonValue
-from apache_beam.internal.json_value import from_json_value
-from apache_beam.internal.json_value import to_json_value
-
-
-class JsonValueTest(unittest.TestCase):
-
-  def test_string_to(self):
-    self.assertEquals(JsonValue(string_value='abc'), to_json_value('abc'))
-
-  def test_true_to(self):
-    self.assertEquals(JsonValue(boolean_value=True), to_json_value(True))
-
-  def test_false_to(self):
-    self.assertEquals(JsonValue(boolean_value=False), to_json_value(False))
-
-  def test_int_to(self):
-    self.assertEquals(JsonValue(integer_value=14), to_json_value(14))
-
-  def test_float_to(self):
-    self.assertEquals(JsonValue(double_value=2.75), to_json_value(2.75))
-
-  def test_none_to(self):
-    self.assertEquals(JsonValue(is_null=True), to_json_value(None))
-
-  def test_string_from(self):
-    self.assertEquals('WXYZ', from_json_value(to_json_value('WXYZ')))
-
-  def test_true_from(self):
-    self.assertEquals(True, from_json_value(to_json_value(True)))
-
-  def test_false_from(self):
-    self.assertEquals(False, from_json_value(to_json_value(False)))
-
-  def test_int_from(self):
-    self.assertEquals(-27, from_json_value(to_json_value(-27)))
-
-  def test_float_from(self):
-    self.assertEquals(4.5, from_json_value(to_json_value(4.5)))
-
-  def test_with_type(self):
-    rt = from_json_value(to_json_value('abcd', with_type=True))
-    self.assertEquals('http://schema.org/Text', rt['@type'])
-    self.assertEquals('abcd', rt['value'])
-
-  def test_none_from(self):
-    self.assertIsNone(from_json_value(to_json_value(None)))
-
-  def test_large_integer(self):
-    num = 1 << 35
-    self.assertEquals(num, from_json_value(to_json_value(num)))
-    self.assertEquals(long(num), from_json_value(to_json_value(long(num))))
-
-  def test_long_value(self):
-    self.assertEquals(long(27), from_json_value(to_json_value(long(27))))
-
-  def test_too_long_value(self):
-    with self.assertRaises(TypeError):
-      to_json_value(long(1 << 64))
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/io/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/__init__.py b/sdks/python/apache_beam/io/__init__.py
index 972ed53..af6c56e 100644
--- a/sdks/python/apache_beam/io/__init__.py
+++ b/sdks/python/apache_beam/io/__init__.py
@@ -27,5 +27,12 @@ from apache_beam.io.iobase import Writer
 from apache_beam.io.textio import *
 from apache_beam.io.tfrecordio import *
 from apache_beam.io.range_trackers import *
-from apache_beam.io.google_cloud_platform.bigquery import *
-from apache_beam.io.google_cloud_platform.pubsub import *
+
+# Protect against environments where clientslibrary is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.io.google_cloud_platform.bigquery import *
+  from apache_beam.io.google_cloud_platform.pubsub import *
+except ImportError:
+  pass
+# pylint: enable=wrong-import-order, wrong-import-position

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 2fe267f..761644f 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -30,9 +30,22 @@ import zlib
 
 from apache_beam.internal import util
 from apache_beam.io import iobase
-from apache_beam.io.google_cloud_platform import gcsio
 from apache_beam.transforms.display import DisplayDataItem
 
+# TODO(sourabhbajaj): Fix the constant values after the new IO factory
+# Current constants are copy pasted from gcsio.py till we fix this.
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.io.google_cloud_platform import gcsio
+  DEFAULT_READ_BUFFER_SIZE = gcsio.DEFAULT_READ_BUFFER_SIZE
+  MAX_BATCH_OPERATION_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE
+except ImportError:
+  gcsio = None
+  DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
+  MAX_BATCH_OPERATION_SIZE = 100
+# pylint: enable=wrong-import-order, wrong-import-position
+
 
 DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
 
@@ -190,7 +203,7 @@ class ChannelFactory(object):
     gcs_current_batch = []
     for src, dest in src_dest_pairs:
       gcs_current_batch.append((src, dest))
-      if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE:
+      if len(gcs_current_batch) == MAX_BATCH_OPERATION_SIZE:
         gcs_batches.append(gcs_current_batch)
         gcs_current_batch = []
     if gcs_current_batch:
@@ -324,7 +337,7 @@ class _CompressedFile(object):
   def __init__(self,
                fileobj,
                compression_type=CompressionTypes.GZIP,
-               read_size=gcsio.DEFAULT_READ_BUFFER_SIZE):
+               read_size=DEFAULT_READ_BUFFER_SIZE):
     if not fileobj:
       raise ValueError('File object must be opened file but was at %s' %
                        fileobj)
@@ -633,7 +646,7 @@ class FileSink(iobase.Sink):
     current_batch = []
     for rename_op in rename_ops:
       current_batch.append(rename_op)
-      if len(current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE:
+      if len(current_batch) == MAX_BATCH_OPERATION_SIZE:
         batches.append(current_batch)
         current_batch = []
     if current_batch:

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/io/google_cloud_platform/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/google_cloud_platform/bigquery.py b/sdks/python/apache_beam/io/google_cloud_platform/bigquery.py
index 3beecea..a5d5ab2 100644
--- a/sdks/python/apache_beam/io/google_cloud_platform/bigquery.py
+++ b/sdks/python/apache_beam/io/google_cloud_platform/bigquery.py
@@ -110,21 +110,20 @@ import re
 import time
 import uuid
 
-from apitools.base.py.exceptions import HttpError
-
 from apache_beam import coders
 from apache_beam.internal import auth
-from apache_beam.internal.json_value import from_json_value
-from apache_beam.internal.json_value import to_json_value
+from apache_beam.internal.google_cloud_platform.json_value import from_json_value
+from apache_beam.internal.google_cloud_platform.json_value import to_json_value
 from apache_beam.runners.google_cloud_dataflow.native_io import iobase as dataflow_io
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils import retry
 from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.io.google_cloud_platform.internal.clients import bigquery
 
 # Protect against environments where bigquery library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
 try:
-  from apache_beam.io.google_cloud_platform.internal.clients import bigquery
+  from apitools.base.py.exceptions import HttpError
 except ImportError:
   pass
 # pylint: enable=wrong-import-order, wrong-import-position

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/io/google_cloud_platform/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/google_cloud_platform/bigquery_test.py b/sdks/python/apache_beam/io/google_cloud_platform/bigquery_test.py
index 9c08a20..de8e9c4 100644
--- a/sdks/python/apache_beam/io/google_cloud_platform/bigquery_test.py
+++ b/sdks/python/apache_beam/io/google_cloud_platform/bigquery_test.py
@@ -25,19 +25,27 @@ import unittest
 
 import hamcrest as hc
 import mock
-from apitools.base.py.exceptions import HttpError
 
 import apache_beam as beam
-from apache_beam.internal.json_value import to_json_value
 from apache_beam.io.google_cloud_platform.bigquery import RowAsDictJsonCoder
 from apache_beam.io.google_cloud_platform.bigquery import TableRowJsonCoder
 from apache_beam.io.google_cloud_platform.bigquery import parse_table_schema_from_json
 from apache_beam.io.google_cloud_platform.internal.clients import bigquery
+from apache_beam.internal.google_cloud_platform.json_value import to_json_value
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.utils.pipeline_options import PipelineOptions
 
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  HttpError = None
+# pylint: enable=wrong-import-order, wrong-import-position
 
+
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestRowAsDictJsonCoder(unittest.TestCase):
 
   def test_row_as_dict(self):
@@ -62,6 +70,7 @@ class TestRowAsDictJsonCoder(unittest.TestCase):
     self.json_compliance_exception(float('-inf'))
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestTableRowJsonCoder(unittest.TestCase):
 
   def test_row_as_table_row(self):
@@ -123,6 +132,7 @@ class TestTableRowJsonCoder(unittest.TestCase):
     self.json_compliance_exception(float('-inf'))
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestTableSchemaParser(unittest.TestCase):
   def test_parse_table_schema_from_json(self):
     string_field = bigquery.TableFieldSchema(
@@ -144,6 +154,7 @@ class TestTableSchemaParser(unittest.TestCase):
                      expected_schema)
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestBigQuerySource(unittest.TestCase):
 
   def test_display_data_item_on_validate_true(self):
@@ -234,6 +245,7 @@ class TestBigQuerySource(unittest.TestCase):
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestBigQuerySink(unittest.TestCase):
 
   def test_table_spec_display_data(self):
@@ -291,6 +303,7 @@ class TestBigQuerySink(unittest.TestCase):
         json.loads(sink.schema_as_json()))
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestBigQueryReader(unittest.TestCase):
 
   def get_test_rows(self):
@@ -543,6 +556,7 @@ class TestBigQueryReader(unittest.TestCase):
                       reader.query)
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestBigQueryWriter(unittest.TestCase):
 
   @mock.patch('time.sleep', return_value=None)
@@ -713,6 +727,7 @@ class TestBigQueryWriter(unittest.TestCase):
     self.assertEquals('myproject', writer.project_id)
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestBigQueryWrapper(unittest.TestCase):
 
   def test_delete_non_existing_dataset(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/io/google_cloud_platform/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/google_cloud_platform/gcsio_test.py b/sdks/python/apache_beam/io/google_cloud_platform/gcsio_test.py
index b126626..3a66e9a 100644
--- a/sdks/python/apache_beam/io/google_cloud_platform/gcsio_test.py
+++ b/sdks/python/apache_beam/io/google_cloud_platform/gcsio_test.py
@@ -27,11 +27,18 @@ import unittest
 
 import httplib2
 import mock
-from apitools.base.py.exceptions import HttpError
 
 from apache_beam.io import gcsio
 from apache_beam.io.google_cloud_platform.internal.clients import storage
 
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  HttpError = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
 
 class FakeGcsClient(object):
   # Fake storage client.  Usage in gcsio.py is client.objects.Get(...) and
@@ -196,6 +203,7 @@ class FakeBatchApiRequest(object):
     return api_calls
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestGCSPathParser(unittest.TestCase):
 
   def test_gcs_path(self):
@@ -213,6 +221,7 @@ class TestGCSPathParser(unittest.TestCase):
     self.assertRaises(ValueError, gcsio.parse_gcs_path, 'gs:/blah/bucket/name')
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestGCSIO(unittest.TestCase):
 
   def _insert_random_file(self, client, path, size, generation=1):
@@ -740,6 +749,7 @@ class TestGCSIO(unittest.TestCase):
           self.gcs.size_of_files_in_glob(file_pattern), expected_file_sizes)
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestPipeStream(unittest.TestCase):
 
   def _read_and_verify(self, stream, expected, buffer_size):

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/bigquery/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/bigquery/__init__.py b/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/bigquery/__init__.py
index 673e4d2..2b78ef1 100644
--- a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/bigquery/__init__.py
+++ b/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/bigquery/__init__.py
@@ -20,8 +20,14 @@
 
 import pkgutil
 
-from apitools.base.py import *
-from apache_beam.io.google_cloud_platform.internal.clients.bigquery.bigquery_v2_client import *
-from apache_beam.io.google_cloud_platform.internal.clients.bigquery.bigquery_v2_messages import *
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py import *
+  from apache_beam.io.google_cloud_platform.internal.clients.bigquery.bigquery_v2_client import *
+  from apache_beam.io.google_cloud_platform.internal.clients.bigquery.bigquery_v2_messages import *
+except ImportError:
+  pass
+# pylint: enable=wrong-import-order, wrong-import-position
 
 __path__ = pkgutil.extend_path(__path__, __name__)

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/storage/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/storage/__init__.py b/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/storage/__init__.py
index 81eee3e..fd1976b 100644
--- a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/storage/__init__.py
+++ b/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/storage/__init__.py
@@ -20,8 +20,14 @@
 
 import pkgutil
 
-from apitools.base.py import *
-from apache_beam.io.google_cloud_platform.internal.clients.storage.storage_v1_client import *
-from apache_beam.io.google_cloud_platform.internal.clients.storage.storage_v1_messages import *
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py import *
+  from apache_beam.io.google_cloud_platform.internal.clients.storage.storage_v1_client import *
+  from apache_beam.io.google_cloud_platform.internal.clients.storage.storage_v1_messages import *
+except ImportError:
+  pass
+# pylint: enable=wrong-import-order, wrong-import-position
 
 __path__ = pkgutil.extend_path(__path__, __name__)

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/__init__.py b/sdks/python/apache_beam/runners/__init__.py
index f56b2ec..8464992 100644
--- a/sdks/python/apache_beam/runners/__init__.py
+++ b/sdks/python/apache_beam/runners/__init__.py
@@ -22,8 +22,9 @@ This package defines runners, which are used to execute a pipeline.
 
 from apache_beam.runners.direct.direct_runner import DirectRunner
 from apache_beam.runners.direct.direct_runner import EagerRunner
-from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import create_runner
+
+from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner
 from apache_beam.runners.test.test_dataflow_runner import TestDataflowRunner

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py
index 760935d..b52bd8b 100644
--- a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py
@@ -29,8 +29,8 @@ import traceback
 
 from apache_beam import coders
 from apache_beam import pvalue
-from apache_beam.internal import json_value
 from apache_beam.internal import pickler
+from apache_beam.internal.google_cloud_platform import json_value
 from apache_beam.pvalue import PCollectionView
 from apache_beam.runners.google_cloud_dataflow.dataflow_metrics import DataflowMetrics
 from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow as dataflow_api

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py
index 59bd2fc..ee4ec8f 100644
--- a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py
@@ -25,6 +25,14 @@ from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowPi
 from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRuntimeException
 from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow as dataflow_api
 
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py import base_api
+except ImportError:
+  base_api = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
 
 class DataflowRunnerTest(unittest.TestCase):
 
@@ -33,6 +41,7 @@ class DataflowRunnerTest(unittest.TestCase):
     self.assertTrue(df_result.metrics())
     self.assertTrue(df_result.metrics().query())
 
+  @unittest.skipIf(base_api is None, 'GCP dependencies are not installed')
   @mock.patch('time.sleep', return_value=None)
   def test_wait_until_finish(self, patched_time_sleep):
     values_enum = dataflow_api.Job.CurrentStateValueValuesEnum

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py
index 77a708a..98473ca 100644
--- a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py
@@ -32,7 +32,7 @@ from apitools.base.py import exceptions
 
 from apache_beam import utils
 from apache_beam.internal.auth import get_service_credentials
-from apache_beam.internal.json_value import to_json_value
+from apache_beam.internal.google_cloud_platform.json_value import to_json_value
 from apache_beam.io.google_cloud_platform.internal.clients import storage
 from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow
 from apache_beam.transforms import cy_combiners

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py
index bc57211..7adcf8b 100644
--- a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py
@@ -21,12 +21,20 @@ import unittest
 from mock import Mock
 
 from apache_beam.metrics.cells import DistributionData
+from apache_beam.utils.pipeline_options import PipelineOptions
+
 from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner
-from apache_beam.runners.google_cloud_dataflow.internal import apiclient
 from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow
-from apache_beam.utils.pipeline_options import PipelineOptions
 
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.runners.google_cloud_dataflow.internal import apiclient
+except ImportError:
+  apiclient = None
+# pylint: enable=wrong-import-order, wrong-import-position
 
+@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
 class UtilTest(unittest.TestCase):
 
   @unittest.skip("Enable once BEAM-1080 is fixed.")

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py
index eedf141..1399f21 100644
--- a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py
+++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py
@@ -20,8 +20,14 @@
 
 import pkgutil
 
-from apitools.base.py import *
-from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_messages import *
-from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_client import *
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py import *
+  from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_messages import *
+  from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_client import *
+except ImportError:
+  pass
+# pylint: enable=wrong-import-order, wrong-import-position
 
 __path__ = pkgutil.extend_path(__path__, __name__)

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/message_matchers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/message_matchers_test.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/message_matchers_test.py
index 2b56ae1..889a66d 100644
--- a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/message_matchers_test.py
+++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/message_matchers_test.py
@@ -15,14 +15,22 @@
 # limitations under the License.
 #
 import unittest
-
 import hamcrest as hc
-
 import apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow as dataflow
-from apache_beam.internal.json_value import to_json_value
+
+from apache_beam.internal.google_cloud_platform.json_value import to_json_value
 from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow import message_matchers
 
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py import base_api
+except ImportError:
+  base_api = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
 
+@unittest.skipIf(base_api is None, 'GCP dependencies are not installed')
 class TestMatchers(unittest.TestCase):
 
   def test_structured_name_matcher_basic(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/template_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/template_runner_test.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/template_runner_test.py
index bbcf340..6f6bfe7 100644
--- a/sdks/python/apache_beam/runners/google_cloud_dataflow/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/template_runner_test.py
@@ -25,13 +25,22 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.pipeline import Pipeline
-from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner
-from apache_beam.runners.google_cloud_dataflow.internal import apiclient
 from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner
 
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.runners.google_cloud_dataflow.internal import apiclient
+except ImportError:
+  apiclient = None
+# pylint: enable=wrong-import-order, wrong-import-position
 
+
+@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
 class TemplatingDataflowRunnerTest(unittest.TestCase):
   """TemplatingDataflow tests."""
+
   def test_full_completion(self):
     # Create dummy file and close it.  Note that we need to do this because
     # Windows does not allow NamedTemporaryFiles to be reopened elsewhere

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/google_cloud_dataflow/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/test_dataflow_runner.py
new file mode 100644
index 0000000..323d49b
--- /dev/null
+++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/test_dataflow_runner.py
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Wrapper of Beam runners that's built for running and verifying e2e tests."""
+
+from apache_beam.internal import pickler
+from apache_beam.utils.pipeline_options import TestOptions
+from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner
+
+
+class TestDataflowRunner(DataflowRunner):
+
+  def __init__(self):
+    super(TestDataflowRunner, self).__init__()
+
+  def run(self, pipeline):
+    """Execute test pipeline and verify test matcher"""
+    self.result = super(TestDataflowRunner, self).run(pipeline)
+    self.result.wait_until_finish()
+
+    options = pipeline.options.view_as(TestOptions)
+    if options.on_success_matcher:
+      from hamcrest import assert_that as hc_assert_that
+      hc_assert_that(self.result, pickler.loads(options.on_success_matcher))
+
+    return self.result

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 23f8b22..a715374 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -36,16 +36,24 @@ from apache_beam.metrics.execution import MetricKey
 from apache_beam.metrics.execution import MetricResult
 from apache_beam.metrics.metricbase import MetricName
 from apache_beam.pipeline import Pipeline
-from apache_beam.runners import DataflowRunner
 from apache_beam.runners import DirectRunner
-from apache_beam.runners import TestDataflowRunner
 from apache_beam.runners import create_runner
-from apache_beam.runners.google_cloud_dataflow.internal import apiclient
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 from apache_beam.utils.pipeline_options import PipelineOptions
 
+from apache_beam.runners import DataflowRunner
+from apache_beam.runners import TestDataflowRunner
+
+# Protect against environments where api client is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.runners.google_cloud_dataflow.internal import apiclient
+except ImportError:
+  apiclient = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
 
 class RunnerTest(unittest.TestCase):
   default_properties = [
@@ -59,12 +67,14 @@ class RunnerTest(unittest.TestCase):
   def test_create_runner(self):
     self.assertTrue(
         isinstance(create_runner('DirectRunner'), DirectRunner))
-    self.assertTrue(
-        isinstance(create_runner('DataflowRunner'),
-                   DataflowRunner))
-    self.assertTrue(
-        isinstance(create_runner('TestDataflowRunner'),
-                   TestDataflowRunner))
+    if apiclient is not None:
+      self.assertTrue(
+          isinstance(create_runner('DataflowRunner'),
+                     DataflowRunner))
+    if apiclient is not None:
+      self.assertTrue(
+          isinstance(create_runner('TestDataflowRunner'),
+                     TestDataflowRunner))
     self.assertRaises(ValueError, create_runner, 'xyz')
 
   def test_create_runner_shorthand(self):
@@ -79,6 +89,7 @@ class RunnerTest(unittest.TestCase):
     self.assertTrue(
         isinstance(create_runner('Direct'), DirectRunner))
 
+  @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
   def test_remote_runner_translation(self):
     remote_runner = DataflowRunner()
     p = Pipeline(remote_runner,
@@ -90,6 +101,7 @@ class RunnerTest(unittest.TestCase):
     remote_runner.job = apiclient.Job(p.options)
     super(DataflowRunner, remote_runner).run(p)
 
+  @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
   def test_remote_runner_display_data(self):
     remote_runner = DataflowRunner()
     p = Pipeline(remote_runner,
@@ -194,6 +206,7 @@ class RunnerTest(unittest.TestCase):
                 DistributionResult(DistributionData(15, 5, 1, 5)),
                 DistributionResult(DistributionData(15, 5, 1, 5)))))
 
+  @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
   def test_no_group_by_key_directly_after_bigquery(self):
     remote_runner = DataflowRunner()
     p = Pipeline(remote_runner,

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/test/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/test/__init__.py b/sdks/python/apache_beam/runners/test/__init__.py
index 6fa483b..bdc152f 100644
--- a/sdks/python/apache_beam/runners/test/__init__.py
+++ b/sdks/python/apache_beam/runners/test/__init__.py
@@ -21,4 +21,10 @@ This package defines runners, which are used to execute test pipeline and
 verify results.
 """
 
-from apache_beam.runners.test.test_dataflow_runner import TestDataflowRunner
+# Protect against environments where dataflow runner is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.runners.google_cloud_dataflow.test_dataflow_runner import TestDataflowRunner
+except ImportError:
+  pass
+# pylint: enable=wrong-import-order, wrong-import-position

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
deleted file mode 100644
index ef1a8b6..0000000
--- a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Wrapper of Beam runners that's built for running and verifying e2e tests."""
-
-from apache_beam.internal import pickler
-from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner
-from apache_beam.utils.pipeline_options import TestOptions
-
-
-class TestDataflowRunner(DataflowRunner):
-
-  def __init__(self):
-    super(TestDataflowRunner, self).__init__()
-
-  def run(self, pipeline):
-    """Execute test pipeline and verify test matcher"""
-    self.result = super(TestDataflowRunner, self).run(pipeline)
-    self.result.wait_until_finish()
-
-    options = pipeline.options.view_as(TestOptions)
-    if options.on_success_matcher:
-      from hamcrest import assert_that as hc_assert_that
-      hc_assert_that(self.result, pickler.loads(options.on_success_matcher))
-
-    return self.result

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 9b286a2..41dfc07 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -25,13 +25,17 @@ of test pipeline job. Customized verifier should extend
 import hashlib
 import logging
 
-from apitools.base.py.exceptions import HttpError
 from hamcrest.core.base_matcher import BaseMatcher
 
 from apache_beam.io.fileio import ChannelFactory
 from apache_beam.runners.runner import PipelineState
 from apache_beam.utils import retry
 
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  HttpError = None
+
 MAX_RETRIES = 4
 
 
@@ -61,8 +65,8 @@ class PipelineStateMatcher(BaseMatcher):
 
 def retry_on_io_error_and_server_error(exception):
   """Filter allowing retries on file I/O errors and service error."""
-  if isinstance(exception, HttpError) or \
-          isinstance(exception, IOError):
+  if isinstance(exception, IOError) or \
+          (HttpError is not None and isinstance(exception, HttpError)):
     return True
   else:
     return False

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
index 84a4dbe..1801624 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
@@ -21,7 +21,6 @@ import logging
 import tempfile
 import unittest
 
-from apitools.base.py.exceptions import HttpError
 from hamcrest import assert_that as hc_assert_that
 from mock import Mock, patch
 
@@ -31,6 +30,11 @@ from apache_beam.runners.runner import PipelineResult
 from apache_beam.tests import pipeline_verifiers as verifiers
 from apache_beam.tests.test_utils import patch_retry
 
+try:
+  from apitools.base.py.exceptions import HttpError
+except:
+  HttpError = None
+
 
 class PipelineVerifiersTest(unittest.TestCase):
 
@@ -102,6 +106,7 @@ class PipelineVerifiersTest(unittest.TestCase):
     self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
 
   @patch.object(ChannelFactory, 'glob')
+  @unittest.skipIf(HttpError is None, 'google-apitools is not installed')
   def test_file_checksum_matcher_service_error(self, mock_glob):
     mock_glob.side_effect = HttpError(
         response={'status': '404'}, url='', content='Not Found',

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/utils/retry.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py
index 648d726..05973c5 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -29,7 +29,14 @@ import sys
 import time
 import traceback
 
-from apitools.base.py.exceptions import HttpError
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+# TODO(sourabhbajaj): Remove the GCP specific error code to a submodule
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  HttpError = None
+# pylint: enable=wrong-import-order, wrong-import-position
 
 
 class PermanentException(Exception):
@@ -78,7 +85,7 @@ class FuzzedExponentialIntervals(object):
 
 def retry_on_server_errors_filter(exception):
   """Filter allowing retries on server errors and non-HttpErrors."""
-  if isinstance(exception, HttpError):
+  if (HttpError is not None) and isinstance(exception, HttpError):
     if exception.status_code >= 500:
       return True
     else:
@@ -92,7 +99,7 @@ def retry_on_server_errors_filter(exception):
 
 
 def retry_on_server_errors_and_timeout_filter(exception):
-  if isinstance(exception, HttpError):
+  if HttpError is not None and isinstance(exception, HttpError):
     if exception.status_code == 408:  # 408 Request Timeout
       return True
   return retry_on_server_errors_filter(exception)

http://git-wip-us.apache.org/repos/asf/beam/blob/e6b3883a/sdks/python/apache_beam/utils/retry_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/retry_test.py b/sdks/python/apache_beam/utils/retry_test.py
index dd1741c..1b03c83 100644
--- a/sdks/python/apache_beam/utils/retry_test.py
+++ b/sdks/python/apache_beam/utils/retry_test.py
@@ -19,7 +19,15 @@
 
 import unittest
 
-from apitools.base.py.exceptions import HttpError
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+# TODO(sourabhbajaj): Remove the GCP specific error code to a submodule
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  HttpError = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
 
 from apache_beam.utils import retry
 
@@ -80,6 +88,8 @@ class RetryTest(unittest.TestCase):
     raise NotImplementedError
 
   def http_error(self, code):
+    if HttpError is None:
+      raise RuntimeError("This is not a valid test as GCP is not enabled")
     raise HttpError({'status': str(code)}, '', '')
 
   def test_with_explicit_decorator(self):
@@ -109,6 +119,7 @@ class RetryTest(unittest.TestCase):
                       10, b=20)
     self.assertEqual(len(self.clock.calls), 10)
 
+  @unittest.skipIf(HttpError is None, 'google-apitools is not installed')
   def test_with_http_error_that_should_not_be_retried(self):
     self.assertRaises(HttpError,
                       retry.with_exponential_backoff(
@@ -118,6 +129,7 @@ class RetryTest(unittest.TestCase):
     # Make sure just one call was made.
     self.assertEqual(len(self.clock.calls), 0)
 
+  @unittest.skipIf(HttpError is None, 'google-apitools is not installed')
   def test_with_http_error_that_should_be_retried(self):
     self.assertRaises(HttpError,
                       retry.with_exponential_backoff(


Mime
View raw message