beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [21/22] beam git commit: Rename google_cloud_dataflow and google_cloud_platform
Date Thu, 23 Feb 2017 01:23:16 GMT
Rename google_cloud_dataflow and google_cloud_platform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/59ad58ac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/59ad58ac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/59ad58ac

Branch: refs/heads/master
Commit: 59ad58ac530aac9f0d4b81ac08c8ca2e3be4f1dd
Parents: cad84c8
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Wed Feb 22 15:28:38 2017 -0800
Committer: Ahmet Altay <altay@google.com>
Committed: Wed Feb 22 16:30:19 2017 -0800

----------------------------------------------------------------------
 .../examples/cookbook/bigquery_schema.py        |    2 +-
 .../examples/cookbook/datastore_wordcount.py    |    4 +-
 .../apache_beam/examples/snippets/snippets.py   |    4 +-
 .../python/apache_beam/internal/gcp/__init__.py |   16 +
 .../apache_beam/internal/gcp/json_value.py      |  147 +
 .../apache_beam/internal/gcp/json_value_test.py |   93 +
 .../internal/google_cloud_platform/__init__.py  |   16 -
 .../google_cloud_platform/json_value.py         |  147 -
 .../google_cloud_platform/json_value_test.py    |   93 -
 sdks/python/apache_beam/io/__init__.py          |    4 +-
 sdks/python/apache_beam/io/fileio.py            |    2 +-
 sdks/python/apache_beam/io/gcp/__init__.py      |   16 +
 sdks/python/apache_beam/io/gcp/bigquery.py      | 1081 +++++
 sdks/python/apache_beam/io/gcp/bigquery_test.py |  828 ++++
 .../apache_beam/io/gcp/datastore/__init__.py    |   16 +
 .../apache_beam/io/gcp/datastore/v1/__init__.py |   16 +
 .../io/gcp/datastore/v1/datastoreio.py          |  397 ++
 .../io/gcp/datastore/v1/datastoreio_test.py     |  245 +
 .../io/gcp/datastore/v1/fake_datastore.py       |   98 +
 .../apache_beam/io/gcp/datastore/v1/helper.py   |  274 ++
 .../io/gcp/datastore/v1/helper_test.py          |  265 ++
 .../io/gcp/datastore/v1/query_splitter.py       |  275 ++
 .../io/gcp/datastore/v1/query_splitter_test.py  |  208 +
 sdks/python/apache_beam/io/gcp/gcsio.py         |  871 ++++
 sdks/python/apache_beam/io/gcp/gcsio_test.py    |  796 ++++
 .../apache_beam/io/gcp/internal/__init__.py     |   16 +
 .../io/gcp/internal/clients/__init__.py         |   16 +
 .../gcp/internal/clients/bigquery/__init__.py   |   33 +
 .../clients/bigquery/bigquery_v2_client.py      |  660 +++
 .../clients/bigquery/bigquery_v2_messages.py    | 1910 ++++++++
 .../io/gcp/internal/clients/storage/__init__.py |   33 +
 .../clients/storage/storage_v1_client.py        | 1039 +++++
 .../clients/storage/storage_v1_messages.py      | 1920 ++++++++
 sdks/python/apache_beam/io/gcp/pubsub.py        |   91 +
 sdks/python/apache_beam/io/gcp/pubsub_test.py   |   63 +
 .../io/google_cloud_platform/__init__.py        |   16 -
 .../io/google_cloud_platform/bigquery.py        | 1081 -----
 .../io/google_cloud_platform/bigquery_test.py   |  828 ----
 .../google_cloud_platform/datastore/__init__.py |   16 -
 .../datastore/v1/__init__.py                    |   16 -
 .../datastore/v1/datastoreio.py                 |  397 --
 .../datastore/v1/datastoreio_test.py            |  245 -
 .../datastore/v1/fake_datastore.py              |   98 -
 .../datastore/v1/helper.py                      |  274 --
 .../datastore/v1/helper_test.py                 |  265 --
 .../datastore/v1/query_splitter.py              |  275 --
 .../datastore/v1/query_splitter_test.py         |  208 -
 .../io/google_cloud_platform/gcsio.py           |  871 ----
 .../io/google_cloud_platform/gcsio_test.py      |  796 ----
 .../google_cloud_platform/internal/__init__.py  |   16 -
 .../internal/clients/__init__.py                |   16 -
 .../internal/clients/bigquery/__init__.py       |   33 -
 .../clients/bigquery/bigquery_v2_client.py      |  660 ---
 .../clients/bigquery/bigquery_v2_messages.py    | 1910 --------
 .../internal/clients/storage/__init__.py        |   33 -
 .../clients/storage/storage_v1_client.py        | 1039 -----
 .../clients/storage/storage_v1_messages.py      | 1920 --------
 .../io/google_cloud_platform/pubsub.py          |   91 -
 .../io/google_cloud_platform/pubsub_test.py     |   63 -
 sdks/python/apache_beam/io/iobase.py            |    4 +-
 sdks/python/apache_beam/pipeline_test.py        |    2 +-
 sdks/python/apache_beam/runners/__init__.py     |    4 +-
 .../apache_beam/runners/dataflow/__init__.py    |   16 +
 .../runners/dataflow/dataflow_metrics.py        |   33 +
 .../runners/dataflow/dataflow_metrics_test.py   |   20 +
 .../runners/dataflow/dataflow_runner.py         |  724 +++
 .../runners/dataflow/dataflow_runner_test.py    |   78 +
 .../runners/dataflow/internal/__init__.py       |   16 +
 .../runners/dataflow/internal/apiclient.py      |  726 +++
 .../runners/dataflow/internal/apiclient_test.py |   96 +
 .../dataflow/internal/clients/__init__.py       |   16 +
 .../internal/clients/dataflow/__init__.py       |   33 +
 .../clients/dataflow/dataflow_v1b3_client.py    |  684 +++
 .../clients/dataflow/dataflow_v1b3_messages.py  | 4173 ++++++++++++++++++
 .../clients/dataflow/message_matchers.py        |  124 +
 .../clients/dataflow/message_matchers_test.py   |   77 +
 .../runners/dataflow/native_io/__init__.py      |   16 +
 .../runners/dataflow/native_io/iobase.py        |  307 ++
 .../runners/dataflow/template_runner_test.py    |   97 +
 .../runners/dataflow/test_dataflow_runner.py    |   40 +
 .../runners/direct/transform_evaluator.py       |    2 +-
 .../runners/google_cloud_dataflow/__init__.py   |   16 -
 .../google_cloud_dataflow/dataflow_metrics.py   |   33 -
 .../dataflow_metrics_test.py                    |   20 -
 .../google_cloud_dataflow/dataflow_runner.py    |  724 ---
 .../dataflow_runner_test.py                     |   78 -
 .../google_cloud_dataflow/internal/__init__.py  |   16 -
 .../google_cloud_dataflow/internal/apiclient.py |  726 ---
 .../internal/apiclient_test.py                  |   96 -
 .../internal/clients/__init__.py                |   16 -
 .../internal/clients/dataflow/__init__.py       |   33 -
 .../clients/dataflow/dataflow_v1b3_client.py    |  684 ---
 .../clients/dataflow/dataflow_v1b3_messages.py  | 4173 ------------------
 .../clients/dataflow/message_matchers.py        |  124 -
 .../clients/dataflow/message_matchers_test.py   |   77 -
 .../google_cloud_dataflow/native_io/__init__.py |   16 -
 .../google_cloud_dataflow/native_io/iobase.py   |  307 --
 .../template_runner_test.py                     |   97 -
 .../test_dataflow_runner.py                     |   40 -
 sdks/python/apache_beam/runners/runner.py       |    2 +-
 sdks/python/apache_beam/runners/runner_test.py  |    2 +-
 .../python/apache_beam/runners/test/__init__.py |    2 +-
 sdks/python/generate_pydoc.sh                   |    5 +-
 sdks/python/run_pylint.sh                       |   12 +-
 104 files changed, 18725 insertions(+), 18724 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
index b0f37ed..400189e 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
@@ -44,7 +44,7 @@ def run(argv=None):
 
   p = beam.Pipeline(argv=pipeline_args)
 
-  from apache_beam.io.google_cloud_platform.internal.clients import bigquery  # pylint: disable=wrong-import-order, wrong-import-position
+  from apache_beam.io.gcp.internal.clients import bigquery  # pylint: disable=wrong-import-order, wrong-import-position
 
   table_schema = bigquery.TableSchema()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 8d8bf16..bb5d5c0 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -72,8 +72,8 @@ from googledatastore import helper as datastore_helper, PropertyFilter
 
 import apache_beam as beam
 from apache_beam.io import ReadFromText
-from apache_beam.io.google_cloud_platform.datastore.v1.datastoreio import ReadFromDatastore
-from apache_beam.io.google_cloud_platform.datastore.v1.datastoreio import WriteToDatastore
+from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
+from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
 from apache_beam.metrics import Metrics
 from apache_beam.utils.pipeline_options import GoogleCloudOptions
 from apache_beam.utils.pipeline_options import PipelineOptions

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index b5dfe8f..18f1506 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -873,8 +873,8 @@ def model_datastoreio():
   import googledatastore
   import apache_beam as beam
   from apache_beam.utils.pipeline_options import PipelineOptions
-  from apache_beam.io.google_cloud_platform.datastore.v1.datastoreio import ReadFromDatastore
-  from apache_beam.io.google_cloud_platform.datastore.v1.datastoreio import WriteToDatastore
+  from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
+  from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
 
   project = 'my_project'
   kind = 'my_kind'

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/internal/gcp/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/gcp/__init__.py b/sdks/python/apache_beam/internal/gcp/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/internal/gcp/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/internal/gcp/json_value.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/gcp/json_value.py b/sdks/python/apache_beam/internal/gcp/json_value.py
new file mode 100644
index 0000000..c8b5393
--- /dev/null
+++ b/sdks/python/apache_beam/internal/gcp/json_value.py
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""JSON conversion utility functions."""
+
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py import extra_types
+except ImportError:
+  extra_types = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+_MAXINT64 = (1 << 63) - 1
+_MININT64 = - (1 << 63)
+
+
+def get_typed_value_descriptor(obj):
+  """Converts a basic type into a @type/value dictionary.
+
+  Args:
+    obj: A basestring, bool, int, or float to be converted.
+
+  Returns:
+    A dictionary containing the keys '@type' and 'value' with the value for
+    the @type of appropriate type.
+
+  Raises:
+    TypeError: if the Python object has a type that is not supported.
+  """
+  if isinstance(obj, basestring):
+    type_name = 'Text'
+  elif isinstance(obj, bool):
+    type_name = 'Boolean'
+  elif isinstance(obj, int):
+    type_name = 'Integer'
+  elif isinstance(obj, float):
+    type_name = 'Float'
+  else:
+    raise TypeError('Cannot get a type descriptor for %s.' % repr(obj))
+  return {'@type': 'http://schema.org/%s' % type_name, 'value': obj}
+
+
+def to_json_value(obj, with_type=False):
+  """Converts Python objects into extra_types.JsonValue objects.
+
+  Args:
+    obj: Python object to be converted. Can be 'None'.
+    with_type: If true then the basic types (string, int, float, bool) will
+      be wrapped in @type/value dictionaries. Otherwise the straight value is
+      encoded into a JsonValue.
+
+  Returns:
+    A JsonValue object using JsonValue, JsonArray and JsonObject types for the
+    corresponding values, lists, or dictionaries.
+
+  Raises:
+    TypeError: if the Python object contains a type that is not supported.
+
+  The types supported are str, bool, list, tuple, dict, and None. The Dataflow
+  API requires JsonValue(s) in many places, and it is quite convenient to be
+  able to specify these hierarchical objects using Python syntax.
+  """
+  if obj is None:
+    return extra_types.JsonValue(is_null=True)
+  elif isinstance(obj, (list, tuple)):
+    return extra_types.JsonValue(
+        array_value=extra_types.JsonArray(
+            entries=[to_json_value(o, with_type=with_type) for o in obj]))
+  elif isinstance(obj, dict):
+    json_object = extra_types.JsonObject()
+    for k, v in obj.iteritems():
+      json_object.properties.append(
+          extra_types.JsonObject.Property(
+              key=k, value=to_json_value(v, with_type=with_type)))
+    return extra_types.JsonValue(object_value=json_object)
+  elif with_type:
+    return to_json_value(get_typed_value_descriptor(obj), with_type=False)
+  elif isinstance(obj, basestring):
+    return extra_types.JsonValue(string_value=obj)
+  elif isinstance(obj, bool):
+    return extra_types.JsonValue(boolean_value=obj)
+  elif isinstance(obj, int):
+    return extra_types.JsonValue(integer_value=obj)
+  elif isinstance(obj, long):
+    if _MININT64 <= obj <= _MAXINT64:
+      return extra_types.JsonValue(integer_value=obj)
+    else:
+      raise TypeError('Can not encode {} as a 64-bit integer'.format(obj))
+  elif isinstance(obj, float):
+    return extra_types.JsonValue(double_value=obj)
+  else:
+    raise TypeError('Cannot convert %s to a JSON value.' % repr(obj))
+
+
+def from_json_value(v):
+  """Converts extra_types.JsonValue objects into Python objects.
+
+  Args:
+    v: JsonValue object to be converted.
+
+  Returns:
+    A Python object structured as values, lists, and dictionaries corresponding
+    to JsonValue, JsonArray and JsonObject types.
+
+  Raises:
+    TypeError: if the JsonValue object contains a type that is not supported.
+
+  The types supported are str, bool, list, dict, and None. The Dataflow API
+  returns JsonValue(s) in many places and it is quite convenient to be able to
+  convert these hierarchical objects to much simpler Python objects.
+  """
+  if isinstance(v, extra_types.JsonValue):
+    if v.string_value is not None:
+      return v.string_value
+    elif v.boolean_value is not None:
+      return v.boolean_value
+    elif v.integer_value is not None:
+      return v.integer_value
+    elif v.double_value is not None:
+      return v.double_value
+    elif v.array_value is not None:
+      return from_json_value(v.array_value)
+    elif v.object_value is not None:
+      return from_json_value(v.object_value)
+    elif v.is_null:
+      return None
+  elif isinstance(v, extra_types.JsonArray):
+    return [from_json_value(e) for e in v.entries]
+  elif isinstance(v, extra_types.JsonObject):
+    return {p.key: from_json_value(p.value) for p in v.properties}
+  raise TypeError('Cannot convert %s from a JSON value.' % repr(v))

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/internal/gcp/json_value_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/gcp/json_value_test.py b/sdks/python/apache_beam/internal/gcp/json_value_test.py
new file mode 100644
index 0000000..647ae66
--- /dev/null
+++ b/sdks/python/apache_beam/internal/gcp/json_value_test.py
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the json_value module."""
+
+import unittest
+
+from apache_beam.internal.gcp.json_value import from_json_value
+from apache_beam.internal.gcp.json_value import to_json_value
+
+
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py.extra_types import JsonValue
+except ImportError:
+  JsonValue = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+@unittest.skipIf(JsonValue is None, 'GCP dependencies are not installed')
+class JsonValueTest(unittest.TestCase):
+
+  def test_string_to(self):
+    self.assertEquals(JsonValue(string_value='abc'), to_json_value('abc'))
+
+  def test_true_to(self):
+    self.assertEquals(JsonValue(boolean_value=True), to_json_value(True))
+
+  def test_false_to(self):
+    self.assertEquals(JsonValue(boolean_value=False), to_json_value(False))
+
+  def test_int_to(self):
+    self.assertEquals(JsonValue(integer_value=14), to_json_value(14))
+
+  def test_float_to(self):
+    self.assertEquals(JsonValue(double_value=2.75), to_json_value(2.75))
+
+  def test_none_to(self):
+    self.assertEquals(JsonValue(is_null=True), to_json_value(None))
+
+  def test_string_from(self):
+    self.assertEquals('WXYZ', from_json_value(to_json_value('WXYZ')))
+
+  def test_true_from(self):
+    self.assertEquals(True, from_json_value(to_json_value(True)))
+
+  def test_false_from(self):
+    self.assertEquals(False, from_json_value(to_json_value(False)))
+
+  def test_int_from(self):
+    self.assertEquals(-27, from_json_value(to_json_value(-27)))
+
+  def test_float_from(self):
+    self.assertEquals(4.5, from_json_value(to_json_value(4.5)))
+
+  def test_with_type(self):
+    rt = from_json_value(to_json_value('abcd', with_type=True))
+    self.assertEquals('http://schema.org/Text', rt['@type'])
+    self.assertEquals('abcd', rt['value'])
+
+  def test_none_from(self):
+    self.assertIsNone(from_json_value(to_json_value(None)))
+
+  def test_large_integer(self):
+    num = 1 << 35
+    self.assertEquals(num, from_json_value(to_json_value(num)))
+    self.assertEquals(long(num), from_json_value(to_json_value(long(num))))
+
+  def test_long_value(self):
+    self.assertEquals(long(27), from_json_value(to_json_value(long(27))))
+
+  def test_too_long_value(self):
+    with self.assertRaises(TypeError):
+      to_json_value(long(1 << 64))
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/internal/google_cloud_platform/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/google_cloud_platform/__init__.py b/sdks/python/apache_beam/internal/google_cloud_platform/__init__.py
deleted file mode 100644
index cce3aca..0000000
--- a/sdks/python/apache_beam/internal/google_cloud_platform/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/internal/google_cloud_platform/json_value.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/google_cloud_platform/json_value.py b/sdks/python/apache_beam/internal/google_cloud_platform/json_value.py
deleted file mode 100644
index c8b5393..0000000
--- a/sdks/python/apache_beam/internal/google_cloud_platform/json_value.py
+++ /dev/null
@@ -1,147 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""JSON conversion utility functions."""
-
-# Protect against environments where apitools library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
-try:
-  from apitools.base.py import extra_types
-except ImportError:
-  extra_types = None
-# pylint: enable=wrong-import-order, wrong-import-position
-
-
-_MAXINT64 = (1 << 63) - 1
-_MININT64 = - (1 << 63)
-
-
-def get_typed_value_descriptor(obj):
-  """Converts a basic type into a @type/value dictionary.
-
-  Args:
-    obj: A basestring, bool, int, or float to be converted.
-
-  Returns:
-    A dictionary containing the keys '@type' and 'value' with the value for
-    the @type of appropriate type.
-
-  Raises:
-    TypeError: if the Python object has a type that is not supported.
-  """
-  if isinstance(obj, basestring):
-    type_name = 'Text'
-  elif isinstance(obj, bool):
-    type_name = 'Boolean'
-  elif isinstance(obj, int):
-    type_name = 'Integer'
-  elif isinstance(obj, float):
-    type_name = 'Float'
-  else:
-    raise TypeError('Cannot get a type descriptor for %s.' % repr(obj))
-  return {'@type': 'http://schema.org/%s' % type_name, 'value': obj}
-
-
-def to_json_value(obj, with_type=False):
-  """Converts Python objects into extra_types.JsonValue objects.
-
-  Args:
-    obj: Python object to be converted. Can be 'None'.
-    with_type: If true then the basic types (string, int, float, bool) will
-      be wrapped in @type/value dictionaries. Otherwise the straight value is
-      encoded into a JsonValue.
-
-  Returns:
-    A JsonValue object using JsonValue, JsonArray and JsonObject types for the
-    corresponding values, lists, or dictionaries.
-
-  Raises:
-    TypeError: if the Python object contains a type that is not supported.
-
-  The types supported are str, bool, list, tuple, dict, and None. The Dataflow
-  API requires JsonValue(s) in many places, and it is quite convenient to be
-  able to specify these hierarchical objects using Python syntax.
-  """
-  if obj is None:
-    return extra_types.JsonValue(is_null=True)
-  elif isinstance(obj, (list, tuple)):
-    return extra_types.JsonValue(
-        array_value=extra_types.JsonArray(
-            entries=[to_json_value(o, with_type=with_type) for o in obj]))
-  elif isinstance(obj, dict):
-    json_object = extra_types.JsonObject()
-    for k, v in obj.iteritems():
-      json_object.properties.append(
-          extra_types.JsonObject.Property(
-              key=k, value=to_json_value(v, with_type=with_type)))
-    return extra_types.JsonValue(object_value=json_object)
-  elif with_type:
-    return to_json_value(get_typed_value_descriptor(obj), with_type=False)
-  elif isinstance(obj, basestring):
-    return extra_types.JsonValue(string_value=obj)
-  elif isinstance(obj, bool):
-    return extra_types.JsonValue(boolean_value=obj)
-  elif isinstance(obj, int):
-    return extra_types.JsonValue(integer_value=obj)
-  elif isinstance(obj, long):
-    if _MININT64 <= obj <= _MAXINT64:
-      return extra_types.JsonValue(integer_value=obj)
-    else:
-      raise TypeError('Can not encode {} as a 64-bit integer'.format(obj))
-  elif isinstance(obj, float):
-    return extra_types.JsonValue(double_value=obj)
-  else:
-    raise TypeError('Cannot convert %s to a JSON value.' % repr(obj))
-
-
-def from_json_value(v):
-  """Converts extra_types.JsonValue objects into Python objects.
-
-  Args:
-    v: JsonValue object to be converted.
-
-  Returns:
-    A Python object structured as values, lists, and dictionaries corresponding
-    to JsonValue, JsonArray and JsonObject types.
-
-  Raises:
-    TypeError: if the JsonValue object contains a type that is not supported.
-
-  The types supported are str, bool, list, dict, and None. The Dataflow API
-  returns JsonValue(s) in many places and it is quite convenient to be able to
-  convert these hierarchical objects to much simpler Python objects.
-  """
-  if isinstance(v, extra_types.JsonValue):
-    if v.string_value is not None:
-      return v.string_value
-    elif v.boolean_value is not None:
-      return v.boolean_value
-    elif v.integer_value is not None:
-      return v.integer_value
-    elif v.double_value is not None:
-      return v.double_value
-    elif v.array_value is not None:
-      return from_json_value(v.array_value)
-    elif v.object_value is not None:
-      return from_json_value(v.object_value)
-    elif v.is_null:
-      return None
-  elif isinstance(v, extra_types.JsonArray):
-    return [from_json_value(e) for e in v.entries]
-  elif isinstance(v, extra_types.JsonObject):
-    return {p.key: from_json_value(p.value) for p in v.properties}
-  raise TypeError('Cannot convert %s from a JSON value.' % repr(v))

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/internal/google_cloud_platform/json_value_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/google_cloud_platform/json_value_test.py b/sdks/python/apache_beam/internal/google_cloud_platform/json_value_test.py
deleted file mode 100644
index 72509a0..0000000
--- a/sdks/python/apache_beam/internal/google_cloud_platform/json_value_test.py
+++ /dev/null
@@ -1,93 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the json_value module."""
-
-import unittest
-
-from apache_beam.internal.google_cloud_platform.json_value import from_json_value
-from apache_beam.internal.google_cloud_platform.json_value import to_json_value
-
-
-# Protect against environments where apitools library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
-try:
-  from apitools.base.py.extra_types import JsonValue
-except ImportError:
-  JsonValue = None
-# pylint: enable=wrong-import-order, wrong-import-position
-
-
-@unittest.skipIf(JsonValue is None, 'GCP dependencies are not installed')
-class JsonValueTest(unittest.TestCase):
-
-  def test_string_to(self):
-    self.assertEquals(JsonValue(string_value='abc'), to_json_value('abc'))
-
-  def test_true_to(self):
-    self.assertEquals(JsonValue(boolean_value=True), to_json_value(True))
-
-  def test_false_to(self):
-    self.assertEquals(JsonValue(boolean_value=False), to_json_value(False))
-
-  def test_int_to(self):
-    self.assertEquals(JsonValue(integer_value=14), to_json_value(14))
-
-  def test_float_to(self):
-    self.assertEquals(JsonValue(double_value=2.75), to_json_value(2.75))
-
-  def test_none_to(self):
-    self.assertEquals(JsonValue(is_null=True), to_json_value(None))
-
-  def test_string_from(self):
-    self.assertEquals('WXYZ', from_json_value(to_json_value('WXYZ')))
-
-  def test_true_from(self):
-    self.assertEquals(True, from_json_value(to_json_value(True)))
-
-  def test_false_from(self):
-    self.assertEquals(False, from_json_value(to_json_value(False)))
-
-  def test_int_from(self):
-    self.assertEquals(-27, from_json_value(to_json_value(-27)))
-
-  def test_float_from(self):
-    self.assertEquals(4.5, from_json_value(to_json_value(4.5)))
-
-  def test_with_type(self):
-    rt = from_json_value(to_json_value('abcd', with_type=True))
-    self.assertEquals('http://schema.org/Text', rt['@type'])
-    self.assertEquals('abcd', rt['value'])
-
-  def test_none_from(self):
-    self.assertIsNone(from_json_value(to_json_value(None)))
-
-  def test_large_integer(self):
-    num = 1 << 35
-    self.assertEquals(num, from_json_value(to_json_value(num)))
-    self.assertEquals(long(num), from_json_value(to_json_value(long(num))))
-
-  def test_long_value(self):
-    self.assertEquals(long(27), from_json_value(to_json_value(long(27))))
-
-  def test_too_long_value(self):
-    with self.assertRaises(TypeError):
-      to_json_value(long(1 << 64))
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/io/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/__init__.py b/sdks/python/apache_beam/io/__init__.py
index af6c56e..4b434be 100644
--- a/sdks/python/apache_beam/io/__init__.py
+++ b/sdks/python/apache_beam/io/__init__.py
@@ -31,8 +31,8 @@ from apache_beam.io.range_trackers import *
 # Protect against environments where clientslibrary is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
 try:
-  from apache_beam.io.google_cloud_platform.bigquery import *
-  from apache_beam.io.google_cloud_platform.pubsub import *
+  from apache_beam.io.gcp.bigquery import *
+  from apache_beam.io.gcp.pubsub import *
 except ImportError:
   pass
 # pylint: enable=wrong-import-order, wrong-import-position

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 761644f..8b39493 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -37,7 +37,7 @@ from apache_beam.transforms.display import DisplayDataItem
 # Protect against environments where apitools library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
 try:
-  from apache_beam.io.google_cloud_platform import gcsio
+  from apache_beam.io.gcp import gcsio
   DEFAULT_READ_BUFFER_SIZE = gcsio.DEFAULT_READ_BUFFER_SIZE
   MAX_BATCH_OPERATION_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE
 except ImportError:

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/io/gcp/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/__init__.py b/sdks/python/apache_beam/io/gcp/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
new file mode 100644
index 0000000..7822cc8
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -0,0 +1,1081 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""BigQuery sources and sinks.
+
+This module implements reading from and writing to BigQuery tables. It relies
+on several classes exposed by the BigQuery API: TableSchema, TableFieldSchema,
+TableRow, and TableCell. The default mode is to return table rows read from a
+BigQuery source as dictionaries. Similarly a Write transform to a BigQuerySink
+accepts PCollections of dictionaries. This is done for more convenient
+programming.  If desired, the native TableRow objects can be used throughout to
+represent rows (use an instance of TableRowJsonCoder as a coder argument when
+creating the sources or sinks respectively).
+
+Also, for programming convenience, instances of TableReference and TableSchema
+have a string representation that can be used for the corresponding arguments:
+
+  - TableReference can be a PROJECT:DATASET.TABLE or DATASET.TABLE string.
+  - TableSchema can be a NAME:TYPE{,NAME:TYPE}* string
+    (e.g. 'month:STRING,event_count:INTEGER').
+
+The syntax supported is described here:
+https://cloud.google.com/bigquery/bq-command-line-tool-quickstart
+
+BigQuery sources can be used as main inputs or side inputs. A main input
+(common case) is expected to be massive and will be split into manageable chunks
+and processed in parallel. Side inputs are expected to be small and will be read
+completely every time a ParDo DoFn gets executed. In the example below the
+lambda function implementing the DoFn for the Map transform will get on each
+call *one* row of the main table and *all* rows of the side table. The runner
+may use some caching techniques to share the side inputs between calls in order
+to avoid excessive reading:::
+
+  main_table = pipeline | 'very_big' >> beam.io.Read(beam.io.BigQuerySource()
+  side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource()
+  results = (
+      main_table
+      | 'process data' >> beam.Map(
+          lambda element, side_input: ..., AsList(side_table)))
+
+There is no difference in how main and side inputs are read. What makes the
+side_table a 'side input' is the AsList wrapper used when passing the table
+as a parameter to the Map transform. AsList signals to the execution framework
+that its input should be made available whole.
+
+The main and side inputs are implemented differently. Reading a BigQuery table
+as main input entails exporting the table to a set of GCS files (currently in
+JSON format) and then processing those files. Reading the same table as a side
+input entails querying the table for all its rows. The coder argument on
+BigQuerySource controls the reading of the lines in the export files (i.e.,
+transform a JSON object into a PCollection element). The coder is not involved
+when the same table is read as a side input since there is no intermediate
+format involved. We get the table rows directly from the BigQuery service with
+a query.
+
+Users may provide a query to read from rather than reading all of a BigQuery
+table. If specified, the result obtained by executing the specified query will
+be used as the data of the input transform.::
+
+  query_results = pipeline | beam.io.Read(beam.io.BigQuerySource(
+      query='SELECT year, mean_temp FROM samples.weather_stations'))
+
+When creating a BigQuery input transform, users should provide either a query
+or a table. Pipeline construction will fail with a validation error if neither
+or both are specified.
+
+*** Short introduction to BigQuery concepts ***
+Tables have rows (TableRow) and each row has cells (TableCell).
+A table has a schema (TableSchema), which in turn describes the schema of each
+cell (TableFieldSchema). The terms field and cell are used interchangeably.
+
+TableSchema: Describes the schema (types and order) for values in each row.
+  Has one attribute, 'field', which is list of TableFieldSchema objects.
+
+TableFieldSchema: Describes the schema (type, name) for one field.
+  Has several attributes, including 'name' and 'type'. Common values for
+  the type attribute are: 'STRING', 'INTEGER', 'FLOAT', 'BOOLEAN'. All possible
+  values are described at:
+  https://cloud.google.com/bigquery/preparing-data-for-bigquery#datatypes
+
+TableRow: Holds all values in a table row. Has one attribute, 'f', which is a
+  list of TableCell instances.
+
+TableCell: Holds the value for one cell (or field).  Has one attribute,
+  'v', which is a JsonValue instance. This class is defined in
+  apitools.base.py.extra_types.py module.
+"""
+
+from __future__ import absolute_import
+
+import collections
+import datetime
+import json
+import logging
+import re
+import time
+import uuid
+
+from apache_beam import coders
+from apache_beam.internal import auth
+from apache_beam.internal.gcp.json_value import from_json_value
+from apache_beam.internal.gcp.json_value import to_json_value
+from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+from apache_beam.transforms.display import DisplayDataItem
+from apache_beam.utils import retry
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.io.gcp.internal.clients import bigquery
+
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  pass
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+__all__ = [
+    'TableRowJsonCoder',
+    'BigQueryDisposition',
+    'BigQuerySource',
+    'BigQuerySink',
+    ]
+
+JSON_COMPLIANCE_ERROR = 'NAN, INF and -INF values are not JSON compliant.'
+MAX_RETRIES = 3
+
+
+class RowAsDictJsonCoder(coders.Coder):
+  """A coder for a table row (represented as a dict) to/from a JSON string.
+
+  This is the default coder for sources and sinks if the coder argument is not
+  specified.
+  """
+
+  def encode(self, table_row):
+    # The normal error when dumping NAN/INF values is:
+    # ValueError: Out of range float values are not JSON compliant
+    # This code will catch this error to emit an error that explains
+    # to the programmer that they have used NAN/INF values.
+    try:
+      return json.dumps(table_row, allow_nan=False)
+    except ValueError as e:
+      raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
+
+  def decode(self, encoded_table_row):
+    return json.loads(encoded_table_row)
+
+
+class TableRowJsonCoder(coders.Coder):
+  """A coder for a TableRow instance to/from a JSON string.
+
+  Note that the encoding operation (used when writing to sinks) requires the
+  table schema in order to obtain the ordered list of field names. Reading from
+  sources on the other hand does not need the table schema.
+  """
+
+  def __init__(self, table_schema=None):
+    # The table schema is needed for encoding TableRows as JSON (writing to
+    # sinks) because the ordered list of field names is used in the JSON
+    # representation.
+    self.table_schema = table_schema
+    # Precompute field names since we need them for row encoding.
+    if self.table_schema:
+      self.field_names = tuple(fs.name for fs in self.table_schema.fields)
+
+  def encode(self, table_row):
+    if self.table_schema is None:
+      raise AttributeError(
+          'The TableRowJsonCoder requires a table schema for '
+          'encoding operations. Please specify a table_schema argument.')
+    try:
+      return json.dumps(
+          collections.OrderedDict(
+              zip(self.field_names,
+                  [from_json_value(f.v) for f in table_row.f])),
+          allow_nan=False)
+    except ValueError as e:
+      raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
+
+  def decode(self, encoded_table_row):
+    od = json.loads(
+        encoded_table_row, object_pairs_hook=collections.OrderedDict)
+    return bigquery.TableRow(
+        f=[bigquery.TableCell(v=to_json_value(e)) for e in od.itervalues()])
+
+
+def parse_table_schema_from_json(schema_string):
+  """Parse the Table Schema provided as string.
+
+  Args:
+    schema_string: String serialized table schema, should be a valid JSON.
+
+  Returns:
+    A TableSchema of the BigQuery export from either the Query or the Table.
+  """
+  json_schema = json.loads(schema_string)
+
+  def _parse_schema_field(field):
+    """Parse a single schema field from dictionary.
+
+    Args:
+      field: Dictionary object containing serialized schema.
+
+    Returns:
+      A TableFieldSchema for a single column in BigQuery.
+    """
+    schema = bigquery.TableFieldSchema()
+    schema.name = field['name']
+    schema.type = field['type']
+    if 'mode' in field:
+      schema.mode = field['mode']
+    else:
+      schema.mode = 'NULLABLE'
+    if 'description' in field:
+      schema.description = field['description']
+    if 'fields' in field:
+      schema.fields = [_parse_schema_field(x) for x in field['fields']]
+    return schema
+
+  fields = [_parse_schema_field(f) for f in json_schema['fields']]
+  return bigquery.TableSchema(fields=fields)
+
+
+class BigQueryDisposition(object):
+  """Class holding standard strings used for create and write dispositions."""
+
+  CREATE_NEVER = 'CREATE_NEVER'
+  CREATE_IF_NEEDED = 'CREATE_IF_NEEDED'
+  WRITE_TRUNCATE = 'WRITE_TRUNCATE'
+  WRITE_APPEND = 'WRITE_APPEND'
+  WRITE_EMPTY = 'WRITE_EMPTY'
+
+  @staticmethod
+  def validate_create(disposition):
+    values = (BigQueryDisposition.CREATE_NEVER,
+              BigQueryDisposition.CREATE_IF_NEEDED)
+    if disposition not in values:
+      raise ValueError(
+          'Invalid create disposition %s. Expecting %s' % (disposition, values))
+    return disposition
+
+  @staticmethod
+  def validate_write(disposition):
+    values = (BigQueryDisposition.WRITE_TRUNCATE,
+              BigQueryDisposition.WRITE_APPEND,
+              BigQueryDisposition.WRITE_EMPTY)
+    if disposition not in values:
+      raise ValueError(
+          'Invalid write disposition %s. Expecting %s' % (disposition, values))
+    return disposition
+
+
+def _parse_table_reference(table, dataset=None, project=None):
+  """Parses a table reference into a (project, dataset, table) tuple.
+
+  Args:
+    table: The ID of the table. The ID must contain only letters
+      (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is None
+      then the table argument must contain the entire table reference:
+      'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. This argument can be a
+      bigquery.TableReference instance in which case dataset and project are
+      ignored and the reference is returned as a result.  Additionally, for date
+      partitioned tables, appending '$YYYYmmdd' to the table name is supported,
+      e.g. 'DATASET.TABLE$YYYYmmdd'.
+    dataset: The ID of the dataset containing this table or null if the table
+      reference is specified entirely by the table argument.
+    project: The ID of the project containing this table or null if the table
+      reference is specified entirely by the table (and possibly dataset)
+      argument.
+
+  Returns:
+    A bigquery.TableReference object. The object has the following attributes:
+    projectId, datasetId, and tableId.
+
+  Raises:
+    ValueError: if the table reference as a string does not match the expected
+      format.
+  """
+
+  if isinstance(table, bigquery.TableReference):
+    return table
+
+  table_reference = bigquery.TableReference()
+  # If dataset argument is not specified, the expectation is that the
+  # table argument will contain a full table reference instead of just a
+  # table name.
+  if dataset is None:
+    match = re.match(
+        r'^((?P<project>.+):)?(?P<dataset>\w+)\.(?P<table>[\w\$]+)$', table)
+    if not match:
+      raise ValueError(
+          'Expected a table reference (PROJECT:DATASET.TABLE or '
+          'DATASET.TABLE) instead of %s.' % table)
+    table_reference.projectId = match.group('project')
+    table_reference.datasetId = match.group('dataset')
+    table_reference.tableId = match.group('table')
+  else:
+    table_reference.projectId = project
+    table_reference.datasetId = dataset
+    table_reference.tableId = table
+  return table_reference
+
+
+# -----------------------------------------------------------------------------
+# BigQuerySource, BigQuerySink.
+
+
+class BigQuerySource(dataflow_io.NativeSource):
+  """A source based on a BigQuery table."""
+
+  def __init__(self, table=None, dataset=None, project=None, query=None,
+               validate=False, coder=None, use_standard_sql=False,
+               flatten_results=True):
+    """Initialize a BigQuerySource.
+
+    Args:
+      table: The ID of a BigQuery table. If specified all data of the table
+        will be used as input of the current source. The ID must contain only
+        letters (a-z, A-Z), numbers (0-9), or underscores (_). If dataset
+        and query arguments are None then the table argument must contain the
+        entire table reference specified as: 'DATASET.TABLE' or
+        'PROJECT:DATASET.TABLE'.
+      dataset: The ID of the dataset containing this table or null if the table
+        reference is specified entirely by the table argument or a query is
+        specified.
+      project: The ID of the project containing this table or null if the table
+        reference is specified entirely by the table argument or a query is
+        specified.
+      query: A query to be used instead of arguments table, dataset, and
+        project.
+      validate: If true, various checks will be done when source gets
+        initialized (e.g., is table present?). This should be True for most
+        scenarios in order to catch errors as early as possible (pipeline
+        construction instead of pipeline execution). It should be False if the
+        table is created during pipeline execution by a previous step.
+      coder: The coder for the table rows if serialized to disk. If None, then
+        the default coder is RowAsDictJsonCoder, which will interpret every line
+        in a file as a JSON serialized dictionary. This argument needs a value
+        only in special cases when returning table rows as dictionaries is not
+        desirable.
+      use_standard_sql: Specifies whether to use BigQuery's standard
+        SQL dialect for this query. The default value is False. If set to True,
+        the query will use BigQuery's updated SQL dialect with improved
+        standards compliance. This parameter is ignored for table inputs.
+      flatten_results: Flattens all nested and repeated fields in the
+        query results. The default value is true.
+
+    Raises:
+      ValueError: if any of the following is true
+      (1) the table reference as a string does not match the expected format
+      (2) neither a table nor a query is specified
+      (3) both a table and a query is specified.
+    """
+
+    if table is not None and query is not None:
+      raise ValueError('Both a BigQuery table and a query were specified.'
+                       ' Please specify only one of these.')
+    elif table is None and query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+    elif table is not None:
+      self.table_reference = _parse_table_reference(table, dataset, project)
+      self.query = None
+      self.use_legacy_sql = True
+    else:
+      self.query = query
+      # TODO(BEAM-1082): Change the internal flag to be standard_sql
+      self.use_legacy_sql = not use_standard_sql
+      self.table_reference = None
+
+    self.validate = validate
+    self.flatten_results = flatten_results
+    self.coder = coder or RowAsDictJsonCoder()
+
+  def display_data(self):
+    if self.query is not None:
+      res = {'query': DisplayDataItem(self.query, label='Query')}
+    else:
+      if self.table_reference.projectId is not None:
+        tableSpec = '{}:{}.{}'.format(self.table_reference.projectId,
+                                      self.table_reference.datasetId,
+                                      self.table_reference.tableId)
+      else:
+        tableSpec = '{}.{}'.format(self.table_reference.datasetId,
+                                   self.table_reference.tableId)
+      res = {'table': DisplayDataItem(tableSpec, label='Table')}
+
+    res['validation'] = DisplayDataItem(self.validate,
+                                        label='Validation Enabled')
+    return res
+
+  @property
+  def format(self):
+    """Source format name required for remote execution."""
+    return 'bigquery'
+
+  def reader(self, test_bigquery_client=None):
+    return BigQueryReader(
+        source=self,
+        test_bigquery_client=test_bigquery_client,
+        use_legacy_sql=self.use_legacy_sql,
+        flatten_results=self.flatten_results)
+
+
+class BigQuerySink(dataflow_io.NativeSink):
+  """A sink based on a BigQuery table."""
+
+  def __init__(self, table, dataset=None, project=None, schema=None,
+               create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
+               write_disposition=BigQueryDisposition.WRITE_EMPTY,
+               validate=False, coder=None):
+    """Initialize a BigQuerySink.
+
+    Args:
+      table: The ID of the table. The ID must contain only letters
+        (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is
+        None then the table argument must contain the entire table reference
+        specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
+      dataset: The ID of the dataset containing this table or null if the table
+        reference is specified entirely by the table argument.
+      project: The ID of the project containing this table or null if the table
+        reference is specified entirely by the table argument.
+      schema: The schema to be used if the BigQuery table to write has to be
+        created. This can be either specified as a 'bigquery.TableSchema' object
+        or a single string  of the form 'field1:type1,field2:type2,field3:type3'
+        that defines a comma separated list of fields. Here 'type' should
+        specify the BigQuery type of the field. Single string based schemas do
+        not support nested fields, repeated fields, or specifying a BigQuery
+        mode for fields (mode will always be set to 'NULLABLE').
+      create_disposition: A string describing what happens if the table does not
+        exist. Possible values are:
+        - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist.
+        - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist.
+      write_disposition: A string describing what happens if the table has
+        already some data. Possible values are:
+        -  BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.
+        -  BigQueryDisposition.WRITE_APPEND: add to existing rows.
+        -  BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
+      validate: If true, various checks will be done when sink gets
+        initialized (e.g., is table present given the disposition arguments?).
+        This should be True for most scenarios in order to catch errors as early
+        as possible (pipeline construction instead of pipeline execution). It
+        should be False if the table is created during pipeline execution by a
+        previous step.
+      coder: The coder for the table rows if serialized to disk. If None, then
+        the default coder is RowAsDictJsonCoder, which will interpret every
+        element written to the sink as a dictionary that will be JSON serialized
+        as a line in a file. This argument needs a value only in special cases
+        when writing table rows as dictionaries is not desirable.
+
+    Raises:
+      TypeError: if the schema argument is not a string or a TableSchema object.
+      ValueError: if the table reference as a string does not match the expected
+      format.
+    """
+    self.table_reference = _parse_table_reference(table, dataset, project)
+    # Transform the table schema into a bigquery.TableSchema instance.
+    if isinstance(schema, basestring):
+      # TODO(silviuc): Should add a regex-based validation of the format.
+      table_schema = bigquery.TableSchema()
+      schema_list = [s.strip(' ') for s in schema.split(',')]
+      for field_and_type in schema_list:
+        field_name, field_type = field_and_type.split(':')
+        field_schema = bigquery.TableFieldSchema()
+        field_schema.name = field_name
+        field_schema.type = field_type
+        field_schema.mode = 'NULLABLE'
+        table_schema.fields.append(field_schema)
+      self.table_schema = table_schema
+    elif schema is None:
+      # TODO(silviuc): Should check that table exists if no schema specified.
+      self.table_schema = schema
+    elif isinstance(schema, bigquery.TableSchema):
+      self.table_schema = schema
+    else:
+      raise TypeError('Unexpected schema argument: %s.' % schema)
+
+    self.create_disposition = BigQueryDisposition.validate_create(
+        create_disposition)
+    self.write_disposition = BigQueryDisposition.validate_write(
+        write_disposition)
+    self.validate = validate
+    self.coder = coder or RowAsDictJsonCoder()
+
+  def display_data(self):
+    res = {}
+    if self.table_reference is not None:
+      tableSpec = '{}.{}'.format(self.table_reference.datasetId,
+                                 self.table_reference.tableId)
+      if self.table_reference.projectId is not None:
+        tableSpec = '{}:{}'.format(self.table_reference.projectId,
+                                   tableSpec)
+      res['table'] = DisplayDataItem(tableSpec, label='Table')
+
+    res['validation'] = DisplayDataItem(self.validate,
+                                        label="Validation Enabled")
+    return res
+
+  def schema_as_json(self):
+    """Returns the TableSchema associated with the sink as a JSON string."""
+
+    def schema_list_as_object(schema_list):
+      """Returns a list of TableFieldSchema objects as a list of dicts."""
+      fields = []
+      for f in schema_list:
+        fs = {'name': f.name, 'type': f.type}
+        if f.description is not None:
+          fs['description'] = f.description
+        if f.mode is not None:
+          fs['mode'] = f.mode
+        if f.type.lower() == 'record':
+          fs['fields'] = schema_list_as_object(f.fields)
+        fields.append(fs)
+      return fields
+    return json.dumps(
+        {'fields': schema_list_as_object(self.table_schema.fields)})
+
+  @property
+  def format(self):
+    """Sink format name required for remote execution."""
+    return 'bigquery'
+
+  def writer(self, test_bigquery_client=None, buffer_size=None):
+    return BigQueryWriter(
+        sink=self, test_bigquery_client=test_bigquery_client,
+        buffer_size=buffer_size)
+
+
+# -----------------------------------------------------------------------------
+# BigQueryReader, BigQueryWriter.
+
+
+class BigQueryReader(dataflow_io.NativeSourceReader):
+  """A reader for a BigQuery source."""
+
+  def __init__(self, source, test_bigquery_client=None, use_legacy_sql=True,
+               flatten_results=True):
+    self.source = source
+    self.test_bigquery_client = test_bigquery_client
+    if auth.is_running_in_gce:
+      self.executing_project = auth.executing_project
+    elif hasattr(source, 'pipeline_options'):
+      self.executing_project = (
+          source.pipeline_options.view_as(GoogleCloudOptions).project)
+    else:
+      self.executing_project = None
+
+    # TODO(silviuc): Try to automatically get it from gcloud config info.
+    if not self.executing_project and test_bigquery_client is None:
+      raise RuntimeError(
+          'Missing executing project information. Please use the --project '
+          'command line option to specify it.')
+    self.row_as_dict = isinstance(self.source.coder, RowAsDictJsonCoder)
+    # Schema for the rows being read by the reader. It is initialized the
+    # first time something gets read from the table. It is not required
+    # for reading the field values in each row but could be useful for
+    # getting additional details.
+    self.schema = None
+    self.use_legacy_sql = use_legacy_sql
+    self.flatten_results = flatten_results
+
+    if self.source.query is None:
+      # If table schema did not define a project we default to executing
+      # project.
+      project_id = self.source.table_reference.projectId
+      if not project_id:
+        project_id = self.executing_project
+      self.query = 'SELECT * FROM [%s:%s.%s];' % (
+          project_id,
+          self.source.table_reference.datasetId,
+          self.source.table_reference.tableId)
+    else:
+      self.query = self.source.query
+
+  def __enter__(self):
+    self.client = BigQueryWrapper(client=self.test_bigquery_client)
+    self.client.create_temporary_dataset(self.executing_project)
+    return self
+
+  def __exit__(self, exception_type, exception_value, traceback):
+    self.client.clean_up_temporary_dataset(self.executing_project)
+
+  def __iter__(self):
+    for rows, schema in self.client.run_query(
+        project_id=self.executing_project, query=self.query,
+        use_legacy_sql=self.use_legacy_sql,
+        flatten_results=self.flatten_results):
+      if self.schema is None:
+        self.schema = schema
+      for row in rows:
+        if self.row_as_dict:
+          yield self.client.convert_row_to_dict(row, schema)
+        else:
+          yield row
+
+
+class BigQueryWriter(dataflow_io.NativeSinkWriter):
+  """The sink writer for a BigQuerySink."""
+
+  def __init__(self, sink, test_bigquery_client=None, buffer_size=None):
+    self.sink = sink
+    self.test_bigquery_client = test_bigquery_client
+    self.row_as_dict = isinstance(self.sink.coder, RowAsDictJsonCoder)
+    # Buffer used to batch written rows so we reduce communication with the
+    # BigQuery service.
+    self.rows_buffer = []
+    self.rows_buffer_flush_threshold = buffer_size or 1000
+    # Figure out the project, dataset, and table used for the sink.
+    self.project_id = self.sink.table_reference.projectId
+
+    # If table schema did not define a project we default to executing project.
+    if self.project_id is None and hasattr(sink, 'pipeline_options'):
+      self.project_id = (
+          sink.pipeline_options.view_as(GoogleCloudOptions).project)
+
+    assert self.project_id is not None
+
+    self.dataset_id = self.sink.table_reference.datasetId
+    self.table_id = self.sink.table_reference.tableId
+
+  def _flush_rows_buffer(self):
+    if self.rows_buffer:
+      logging.info('Writing %d rows to %s:%s.%s table.', len(self.rows_buffer),
+                   self.project_id, self.dataset_id, self.table_id)
+      passed, errors = self.client.insert_rows(
+          project_id=self.project_id, dataset_id=self.dataset_id,
+          table_id=self.table_id, rows=self.rows_buffer)
+      self.rows_buffer = []
+      if not passed:
+        raise RuntimeError('Could not successfully insert rows to BigQuery'
+                           ' table [%s:%s.%s]. Errors: %s'%
+                           (self.project_id, self.dataset_id,
+                            self.table_id, errors))
+
+  def __enter__(self):
+    self.client = BigQueryWrapper(client=self.test_bigquery_client)
+    self.client.get_or_create_table(
+        self.project_id, self.dataset_id, self.table_id, self.sink.table_schema,
+        self.sink.create_disposition, self.sink.write_disposition)
+    return self
+
+  def __exit__(self, exception_type, exception_value, traceback):
+    self._flush_rows_buffer()
+
+  def Write(self, row):
+    self.rows_buffer.append(row)
+    if len(self.rows_buffer) > self.rows_buffer_flush_threshold:
+      self._flush_rows_buffer()
+
+
+# -----------------------------------------------------------------------------
+# BigQueryWrapper.
+
+
+class BigQueryWrapper(object):
+  """BigQuery client wrapper with utilities for querying.
+
+  The wrapper is used to organize all the BigQuery integration points and
+  offer a common place where retry logic for failures can be controlled.
+  In addition it offers various functions used both in sources and sinks
+  (e.g., find and create tables, query a table, etc.).
+  """
+
+  TEMP_TABLE = 'temp_table_'
+  TEMP_DATASET = 'temp_dataset_'
+
+  def __init__(self, client=None):
+    self.client = client or bigquery.BigqueryV2(
+        credentials=auth.get_service_credentials())
+    self._unique_row_id = 0
+    # For testing scenarios where we pass in a client we do not want a
+    # randomized prefix for row IDs.
+    self._row_id_prefix = '' if client else uuid.uuid4()
+    self._temporary_table_suffix = uuid.uuid4().hex
+
+  @property
+  def unique_row_id(self):
+    """Returns a unique row ID (str) used to avoid multiple insertions.
+
+    If the row ID is provided, BigQuery will make a best effort to not insert
+    the same row multiple times for fail and retry scenarios in which the insert
+    request may be issued several times. This comes into play for sinks executed
+    in a local runner.
+
+    Returns:
+      a unique row ID string
+    """
+    self._unique_row_id += 1
+    return '%s_%d' % (self._row_id_prefix, self._unique_row_id)
+
+  def _get_temp_table(self, project_id):
+    return _parse_table_reference(
+        table=BigQueryWrapper.TEMP_TABLE + self._temporary_table_suffix,
+        dataset=BigQueryWrapper.TEMP_DATASET + self._temporary_table_suffix,
+        project=project_id)
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def _start_query_job(self, project_id, query, use_legacy_sql, flatten_results,
+                       job_id, dry_run=False):
+    reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
+    request = bigquery.BigqueryJobsInsertRequest(
+        projectId=project_id,
+        job=bigquery.Job(
+            configuration=bigquery.JobConfiguration(
+                dryRun=dry_run,
+                query=bigquery.JobConfigurationQuery(
+                    query=query,
+                    useLegacySql=use_legacy_sql,
+                    allowLargeResults=True,
+                    destinationTable=self._get_temp_table(project_id),
+                    flattenResults=flatten_results)),
+            jobReference=reference))
+
+    response = self.client.jobs.Insert(request)
+    return response.jobReference.jobId
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def _get_query_results(self, project_id, job_id,
+                         page_token=None, max_results=10000):
+    request = bigquery.BigqueryJobsGetQueryResultsRequest(
+        jobId=job_id, pageToken=page_token, projectId=project_id,
+        maxResults=max_results)
+    response = self.client.jobs.GetQueryResults(request)
+    return response
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_filter)
+  def _insert_all_rows(self, project_id, dataset_id, table_id, rows):
+    # The rows argument is a list of
+    # bigquery.TableDataInsertAllRequest.RowsValueListEntry instances as
+    # required by the InsertAll() method.
+    request = bigquery.BigqueryTabledataInsertAllRequest(
+        projectId=project_id, datasetId=dataset_id, tableId=table_id,
+        tableDataInsertAllRequest=bigquery.TableDataInsertAllRequest(
+            # TODO(silviuc): Should have an option for skipInvalidRows?
+            # TODO(silviuc): Should have an option for ignoreUnknownValues?
+            rows=rows))
+    response = self.client.tabledata.InsertAll(request)
+    # response.insertErrors is not [] if errors encountered.
+    return not response.insertErrors, response.insertErrors
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def _get_table(self, project_id, dataset_id, table_id):
+    request = bigquery.BigqueryTablesGetRequest(
+        projectId=project_id, datasetId=dataset_id, tableId=table_id)
+    response = self.client.tables.Get(request)
+    # The response is a bigquery.Table instance.
+    return response
+
+  def _create_table(self, project_id, dataset_id, table_id, schema):
+    table = bigquery.Table(
+        tableReference=bigquery.TableReference(
+            projectId=project_id, datasetId=dataset_id, tableId=table_id),
+        schema=schema)
+    request = bigquery.BigqueryTablesInsertRequest(
+        projectId=project_id, datasetId=dataset_id, table=table)
+    response = self.client.tables.Insert(request)
+    # The response is a bigquery.Table instance.
+    return response
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def get_or_create_dataset(self, project_id, dataset_id):
+    # Check if dataset already exists otherwise create it
+    try:
+      dataset = self.client.datasets.Get(bigquery.BigqueryDatasetsGetRequest(
+          projectId=project_id, datasetId=dataset_id))
+      return dataset
+    except HttpError as exn:
+      if exn.status_code == 404:
+        dataset = bigquery.Dataset(
+            datasetReference=bigquery.DatasetReference(
+                projectId=project_id, datasetId=dataset_id))
+        request = bigquery.BigqueryDatasetsInsertRequest(
+            projectId=project_id, dataset=dataset)
+        response = self.client.datasets.Insert(request)
+        # The response is a bigquery.Dataset instance.
+        return response
+      else:
+        raise
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def _is_table_empty(self, project_id, dataset_id, table_id):
+    request = bigquery.BigqueryTabledataListRequest(
+        projectId=project_id, datasetId=dataset_id, tableId=table_id,
+        maxResults=1)
+    response = self.client.tabledata.List(request)
+    # The response is a bigquery.TableDataList instance.
+    return response.totalRows == 0
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def _delete_table(self, project_id, dataset_id, table_id):
+    request = bigquery.BigqueryTablesDeleteRequest(
+        projectId=project_id, datasetId=dataset_id, tableId=table_id)
+    try:
+      self.client.tables.Delete(request)
+    except HttpError as exn:
+      if exn.status_code == 404:
+        logging.warning('Table %s:%s.%s does not exist', project_id,
+                        dataset_id, table_id)
+        return
+      else:
+        raise
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def _delete_dataset(self, project_id, dataset_id, delete_contents=True):
+    request = bigquery.BigqueryDatasetsDeleteRequest(
+        projectId=project_id, datasetId=dataset_id,
+        deleteContents=delete_contents)
+    try:
+      self.client.datasets.Delete(request)
+    except HttpError as exn:
+      if exn.status_code == 404:
+        logging.warning('Dataaset %s:%s does not exist', project_id,
+                        dataset_id)
+        return
+      else:
+        raise
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def create_temporary_dataset(self, project_id):
+    dataset_id = BigQueryWrapper.TEMP_DATASET + self._temporary_table_suffix
+    # Check if dataset exists to make sure that the temporary id is unique
+    try:
+      self.client.datasets.Get(bigquery.BigqueryDatasetsGetRequest(
+          projectId=project_id, datasetId=dataset_id))
+      if project_id is not None:
+        # Unittests don't pass projectIds so they can be run without error
+        raise RuntimeError(
+            'Dataset %s:%s already exists so cannot be used as temporary.'
+            % (project_id, dataset_id))
+    except HttpError as exn:
+      if exn.status_code == 404:
+        logging.warning('Dataset does not exist so we will create it')
+        self.get_or_create_dataset(project_id, dataset_id)
+      else:
+        raise
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def clean_up_temporary_dataset(self, project_id):
+    temp_table = self._get_temp_table(project_id)
+    try:
+      self.client.datasets.Get(bigquery.BigqueryDatasetsGetRequest(
+          projectId=project_id, datasetId=temp_table.datasetId))
+    except HttpError as exn:
+      if exn.status_code == 404:
+        logging.warning('Dataset %s:%s does not exist', project_id,
+                        temp_table.datasetId)
+        return
+      else:
+        raise
+    self._delete_dataset(temp_table.projectId, temp_table.datasetId, True)
+
+  @retry.with_exponential_backoff(
+      num_retries=MAX_RETRIES,
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def get_or_create_table(
+      self, project_id, dataset_id, table_id, schema,
+      create_disposition, write_disposition):
+    """Gets or creates a table based on create and write dispositions.
+
+    The function mimics the behavior of BigQuery import jobs when using the
+    same create and write dispositions.
+
+    Args:
+      project_id: The project id owning the table.
+      dataset_id: The dataset id owning the table.
+      table_id: The table id.
+      schema: A bigquery.TableSchema instance or None.
+      create_disposition: CREATE_NEVER or CREATE_IF_NEEDED.
+      write_disposition: WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE.
+
+    Returns:
+      A bigquery.Table instance if table was found or created.
+
+    Raises:
+      RuntimeError: For various mismatches between the state of the table and
+        the create/write dispositions passed in. For example if the table is not
+        empty and WRITE_EMPTY was specified then an error will be raised since
+        the table was expected to be empty.
+    """
+    found_table = None
+    try:
+      found_table = self._get_table(project_id, dataset_id, table_id)
+    except HttpError as exn:
+      if exn.status_code == 404:
+        if create_disposition == BigQueryDisposition.CREATE_NEVER:
+          raise RuntimeError(
+              'Table %s:%s.%s not found but create disposition is CREATE_NEVER.'
+              % (project_id, dataset_id, table_id))
+      else:
+        raise
+
+    # If table exists already then handle the semantics for WRITE_EMPTY and
+    # WRITE_TRUNCATE write dispositions.
+    if found_table:
+      table_empty = self._is_table_empty(project_id, dataset_id, table_id)
+      if (not table_empty and
+          write_disposition == BigQueryDisposition.WRITE_EMPTY):
+        raise RuntimeError(
+            'Table %s:%s.%s is not empty but write disposition is WRITE_EMPTY.'
+            % (project_id, dataset_id, table_id))
+      # Delete the table and recreate it (later) if WRITE_TRUNCATE was
+      # specified.
+      if write_disposition == BigQueryDisposition.WRITE_TRUNCATE:
+        self._delete_table(project_id, dataset_id, table_id)
+
+    # Create a new table potentially reusing the schema from a previously
+    # found table in case the schema was not specified.
+    if schema is None and found_table is None:
+      raise RuntimeError(
+          'Table %s:%s.%s requires a schema. None can be inferred because the '
+          'table does not exist.'
+          % (project_id, dataset_id, table_id))
+    if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE:
+      return found_table
+    else:
+      # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
+      # the table before this point.
+      return self._create_table(project_id=project_id,
+                                dataset_id=dataset_id,
+                                table_id=table_id,
+                                schema=schema or found_table.schema)
+
+  def run_query(self, project_id, query, use_legacy_sql, flatten_results,
+                dry_run=False):
+    job_id = self._start_query_job(project_id, query, use_legacy_sql,
+                                   flatten_results, job_id=uuid.uuid4().hex,
+                                   dry_run=dry_run)
+    if dry_run:
+      # If this was a dry run then the fact that we get here means the
+      # query has no errors. The start_query_job would raise an error otherwise.
+      return
+    page_token = None
+    while True:
+      response = self._get_query_results(project_id, job_id, page_token)
+      if not response.jobComplete:
+        # The jobComplete field can be False if the query request times out
+        # (default is 10 seconds). Note that this is a timeout for the query
+        # request not for the actual execution of the query in the service.  If
+        # the request times out we keep trying. This situation is quite possible
+        # if the query will return a large number of rows.
+        logging.info('Waiting on response from query: %s ...', query)
+        time.sleep(1.0)
+        continue
+      # We got some results. The last page is signalled by a missing pageToken.
+      yield response.rows, response.schema
+      if not response.pageToken:
+        break
+      page_token = response.pageToken
+
+  def insert_rows(self, project_id, dataset_id, table_id, rows):
+    """Inserts rows into the specified table.
+
+    Args:
+      project_id: The project id owning the table.
+      dataset_id: The dataset id owning the table.
+      table_id: The table id.
+      rows: A list of plain Python dictionaries. Each dictionary is a row and
+        each key in it is the name of a field.
+
+    Returns:
+      A tuple (bool, errors). If first element is False then the second element
+      will be a bigquery.InserttErrorsValueListEntry instance containing
+      specific errors.
+    """
+
+    # Prepare rows for insertion. Of special note is the row ID that we add to
+    # each row in order to help BigQuery avoid inserting a row multiple times.
+    # BigQuery will do a best-effort if unique IDs are provided. This situation
+    # can happen during retries on failures.
+    # TODO(silviuc): Must add support to writing TableRow's instead of dicts.
+    final_rows = []
+    for row in rows:
+      json_object = bigquery.JsonObject()
+      for k, v in row.iteritems():
+        json_object.additionalProperties.append(
+            bigquery.JsonObject.AdditionalProperty(
+                key=k, value=to_json_value(v)))
+      final_rows.append(
+          bigquery.TableDataInsertAllRequest.RowsValueListEntry(
+              insertId=str(self.unique_row_id),
+              json=json_object))
+    result, errors = self._insert_all_rows(
+        project_id, dataset_id, table_id, final_rows)
+    return result, errors
+
+  def _convert_cell_value_to_dict(self, value, field):
+    if field.type == 'STRING':
+      # Input: "XYZ" --> Output: "XYZ"
+      return value
+    elif field.type == 'BOOLEAN':
+      # Input: "true" --> Output: True
+      return value == 'true'
+    elif field.type == 'INTEGER':
+      # Input: "123" --> Output: 123
+      return int(value)
+    elif field.type == 'FLOAT':
+      # Input: "1.23" --> Output: 1.23
+      return float(value)
+    elif field.type == 'TIMESTAMP':
+      # The UTC should come from the timezone library but this is a known
+      # issue in python 2.7 so we'll just hardcode it as we're reading using
+      # utcfromtimestamp.
+      # Input: 1478134176.985864 --> Output: "2016-11-03 00:49:36.985864 UTC"
+      dt = datetime.datetime.utcfromtimestamp(float(value))
+      return dt.strftime('%Y-%m-%d %H:%M:%S.%f UTC')
+    elif field.type == 'BYTES':
+      # Input: "YmJi" --> Output: "YmJi"
+      return value
+    elif field.type == 'DATE':
+      # Input: "2016-11-03" --> Output: "2016-11-03"
+      return value
+    elif field.type == 'DATETIME':
+      # Input: "2016-11-03T00:49:36" --> Output: "2016-11-03T00:49:36"
+      return value
+    elif field.type == 'TIME':
+      # Input: "00:49:36" --> Output: "00:49:36"
+      return value
+    elif field.type == 'RECORD':
+      # Note that a schema field object supports also a RECORD type. However
+      # when querying, the repeated and/or record fields are flattened
+      # unless we pass the flatten_results flag as False to the source
+      return self.convert_row_to_dict(value, field)
+    else:
+      raise RuntimeError('Unexpected field type: %s' % field.type)
+
+  def convert_row_to_dict(self, row, schema):
+    """Converts a TableRow instance using the schema to a Python dict."""
+    result = {}
+    for index, field in enumerate(schema.fields):
+      value = None
+      if isinstance(schema, bigquery.TableSchema):
+        cell = row.f[index]
+        value = from_json_value(cell.v) if cell.v is not None else None
+      elif isinstance(schema, bigquery.TableFieldSchema):
+        cell = row['f'][index]
+        value = cell['v'] if 'v' in cell else None
+      if field.mode == 'REPEATED':
+        result[field.name] = [self._convert_cell_value_to_dict(x['v'], field)
+                              for x in value]
+      elif value is None:
+        if not field.mode == 'NULLABLE':
+          raise ValueError('Received \'None\' as the value for the field %s '
+                           'but the field is not NULLABLE.', field.name)
+        result[field.name] = None
+      else:
+        result[field.name] = self._convert_cell_value_to_dict(value, field)
+    return result


Mime
View raw message