beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Comply with byte limit for Datastore Commit.
Date Tue, 23 May 2017 15:39:42 GMT
Repository: beam
Updated Branches:
  refs/heads/master 9da46fd05 -> 597b07e0d


Comply with byte limit for Datastore Commit.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3c0f599d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3c0f599d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3c0f599d

Branch: refs/heads/master
Commit: 3c0f599d64a7f57608f1c18b05f2ab036a8b02fc
Parents: 9da46fd
Author: Colin Phipps <fipsy@google.com>
Authored: Wed May 10 09:50:56 2017 +0000
Committer: Ahmet Altay <altay@google.com>
Committed: Tue May 23 08:39:09 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/io/gcp/datastore/v1/datastoreio.py | 15 ++++++++++++++-
 .../io/gcp/datastore/v1/datastoreio_test.py        | 17 +++++++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3c0f599d/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index c606133..89c2a93 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -313,8 +313,12 @@ class _Mutate(PTransform):
   supported, as the commits are retried when failures occur.
   """
 
-  # Max allowed Datastore write batch size.
+  # Max allowed Datastore writes per batch, and max bytes per batch.
+  # Note that the max bytes per batch set here is lower than the 10MB limit
+  # actually enforced by the API, to leave space for the CommitRequest wrapper
+  # around the mutations.
   _WRITE_BATCH_SIZE = 500
+  _WRITE_BATCH_BYTES_SIZE = 9000000
 
   def __init__(self, project, mutation_fn):
     """Initializes a Mutate transform.
@@ -353,13 +357,20 @@ class _Mutate(PTransform):
       self._project = project
       self._datastore = None
       self._mutations = []
+      self._mutations_size = 0  # Total size of entries in _mutations.
 
     def start_bundle(self):
       self._mutations = []
+      self._mutations_size = 0
       self._datastore = helper.get_datastore(self._project)
 
     def process(self, element):
+      size = element.ByteSize()
+      if (self._mutations and
+          size + self._mutations_size > _Mutate._WRITE_BATCH_BYTES_SIZE):
+        self._flush_batch()
       self._mutations.append(element)
+      self._mutations_size += size
       if len(self._mutations) >= _Mutate._WRITE_BATCH_SIZE:
         self._flush_batch()
 
@@ -367,12 +378,14 @@ class _Mutate(PTransform):
       if self._mutations:
         self._flush_batch()
       self._mutations = []
+      self._mutations_size = 0
 
     def _flush_batch(self):
       # Flush the current batch of mutations to Cloud Datastore.
       helper.write_mutations(self._datastore, self._project, self._mutations)
       logging.debug("Successfully wrote %d mutations.", len(self._mutations))
       self._mutations = []
+      self._mutations_size = 0
 
 
 class WriteToDatastore(_Mutate):

http://git-wip-us.apache.org/repos/asf/beam/blob/3c0f599d/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
index 6adc08a..424e714 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+import math
 import unittest
 
 from mock import MagicMock, call, patch
@@ -191,6 +192,22 @@ class DatastoreioTest(unittest.TestCase):
       self.assertEqual((num_entities - 1) / _Mutate._WRITE_BATCH_SIZE + 1,
                        self._mock_datastore.commit.call_count)
 
+  def test_DatastoreWriteLargeEntities(self):
+    """100*100kB entities gets split over two Commit RPCs."""
+    with patch.object(helper, 'get_datastore',
+                      return_value=self._mock_datastore):
+      entities = [e.entity for e in fake_datastore.create_entities(100)]
+
+      datastore_write_fn = _Mutate.DatastoreWriteFn(self._PROJECT)
+      datastore_write_fn.start_bundle()
+      for entity in entities:
+        datastore_helper.add_properties(
+            entity, {'large': u'A' * 100000}, exclude_from_indexes=True)
+        datastore_write_fn.process(WriteToDatastore.to_upsert_mutation(entity))
+      datastore_write_fn.finish_bundle()
+
+      self.assertEqual(2, self._mock_datastore.commit.call_count)
+
   def verify_unique_keys(self, queries):
     """A helper function that verifies if all the queries have unique keys."""
     keys, _ = zip(*queries)


Mime
View raw message