beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/2] beam git commit: Avoid pickling the entire pipeline per-transform.
Date Fri, 23 Jun 2017 23:40:14 GMT
Repository: beam
Updated Branches:
  refs/heads/master 9acce7150 -> a90e40ae9


Avoid pickling the entire pipeline per-transform.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/903da41a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/903da41a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/903da41a

Branch: refs/heads/master
Commit: 903da41ac5395e76c44ef8ae1c8a695569e23abb
Parents: 9acce71
Author: Robert Bradshaw <robertwb@gmail.com>
Authored: Fri Jun 23 15:01:42 2017 -0700
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Fri Jun 23 16:39:51 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py      |  7 +++++++
 sdks/python/apache_beam/pipeline_test.py | 18 ++++++++++++++++++
 2 files changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/903da41a/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index d84a2b7..724c87d 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -466,6 +466,13 @@ class Pipeline(object):
     self.transforms_stack.pop()
     return pvalueish_result
 
+  def __reduce__(self):
+    # Some transforms contain a reference to their enclosing pipeline,
+    # which in turn reference all other transforms (resulting in quadratic
+    # time/space to pickle each transform individually).  As we don't
+    # require pickled pipelines to be executable, break the chain here.
+    return str, ('Pickled pipeline stub.',)
+
   def _verify_runner_api_compatible(self):
     class Visitor(PipelineVisitor):  # pylint: disable=used-before-assignment
       ok = True  # Really a nonlocal.

http://git-wip-us.apache.org/repos/asf/beam/blob/903da41a/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index f9b894f..aad0143 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -480,6 +480,24 @@ class RunnerApiTest(unittest.TestCase):
     p2 = Pipeline.from_runner_api(proto, p.runner, p._options)
     p2.run()
 
+  def test_pickling(self):
+    class MyPTransform(beam.PTransform):
+      pickle_count = [0]
+
+      def expand(self, p):
+        self.p = p
+        return p | beam.Create([None])
+
+      def __reduce__(self):
+        self.pickle_count[0] += 1
+        return str, ()
+
+    p = beam.Pipeline()
+    for k in range(20):
+      p | 'Iter%s' % k >> MyPTransform()  # pylint: disable=expression-not-assigned
+    p.to_runner_api()
+    self.assertEqual(MyPTransform.pickle_count[0], 20)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.DEBUG)


Mime
View raw message