beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/4] incubator-beam git commit: Decreasing the number of copies of things in scope for reduced peak memory utilization in Avro and Pickler.
Date Fri, 23 Dec 2016 04:21:19 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk a46091a79 -> 82d7f0f77


Decreasing the number of copies of things in scope for reduced peak
memory utilization in Avro and Pickler.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bbcdb12f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bbcdb12f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bbcdb12f

Branch: refs/heads/python-sdk
Commit: bbcdb12fbfeac54158ff065ffb0b95fee044d935
Parents: a46091a
Author: Gus Katsiapis <katsiapis@katsiapis-linux.mtv.corp.google.com>
Authored: Tue Dec 20 19:26:11 2016 -0800
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Thu Dec 22 20:20:59 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/pickler.py | 15 +++++++++++++--
 sdks/python/apache_beam/io/avroio.py        | 20 +++++++++++---------
 2 files changed, 24 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bbcdb12f/sdks/python/apache_beam/internal/pickler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py
index d39a497..e995caa 100644
--- a/sdks/python/apache_beam/internal/pickler.py
+++ b/sdks/python/apache_beam/internal/pickler.py
@@ -189,11 +189,22 @@ def dumps(o):
     s = dill.dumps(o)
   finally:
     dill.dill._trace(False)  # pylint: disable=protected-access
-  return base64.b64encode(zlib.compress(s))
+
+  # Compress as compactly as possible to decrease peak memory usage (of multiple
+  # in-memory copies). Also use a separate statement in order to decrease peak
+  # memory usage (by allowing unneeded data to go out of scope).
+  s = zlib.compress(s, 9)
+
+  return base64.b64encode(s)
 
 
 def loads(encoded):
-  s = zlib.decompress(base64.b64decode(encoded))
+  s = base64.b64decode(encoded)
+
+  # Separate statement in order to decrease peak memory usage (by allowing
+  # unneeded data to go out of scope).
+  s = zlib.decompress(s)
+
   try:
     return dill.loads(s)
   except Exception:          # pylint: disable=broad-except

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bbcdb12f/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 3663bdb..b08bc83 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -173,20 +173,23 @@ class _AvroBlock(object):
   """Represents a block of an Avro file."""
 
   def __init__(self, block_bytes, num_records, codec, schema_string):
-    self._block_bytes = block_bytes
+    # Decompress data early on (if needed) and thus decrease the number of
+    # parallel copies of the data in memory at any given in time during
+    # block iteration.
+    self._block_bytes = self._decompress_bytes(block_bytes, codec)
     self._num_records = num_records
-    self._codec = codec
     self._schema = schema.parse(schema_string)
 
-  def _decompress_bytes(self, data):
-    if self._codec == 'null':
+  @staticmethod
+  def _decompress_bytes(data, codec):
+    if codec == 'null':
       return data
-    elif self._codec == 'deflate':
+    elif codec == 'deflate':
       # zlib.MAX_WBITS is the window size. '-' sign indicates that this is
       # raw data (without headers). See zlib and Avro documentations for more
       # details.
       return zlib.decompress(data, -zlib.MAX_WBITS)
-    elif self._codec == 'snappy':
+    elif codec == 'snappy':
       # Snappy is an optional avro codec.
       # See Snappy and Avro documentation for more details.
       try:
@@ -199,14 +202,13 @@ class _AvroBlock(object):
       avroio.BinaryDecoder(StringIO.StringIO(data[-4:])).check_crc32(result)
       return result
     else:
-      raise ValueError('Unknown codec: %r', self._codec)
+      raise ValueError('Unknown codec: %r', codec)
 
   def num_records(self):
     return self._num_records
 
   def records(self):
-    decompressed_bytes = self._decompress_bytes(self._block_bytes)
-    decoder = avroio.BinaryDecoder(StringIO.StringIO(decompressed_bytes))
+    decoder = avroio.BinaryDecoder(StringIO.StringIO(self._block_bytes))
     reader = avroio.DatumReader(
         writers_schema=self._schema, readers_schema=self._schema)
 


Mime
View raw message