Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 91A57200CBF for ; Sat, 24 Jun 2017 01:40:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 904D6160BF2; Fri, 23 Jun 2017 23:40:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D65E3160BE5 for ; Sat, 24 Jun 2017 01:40:15 +0200 (CEST) Received: (qmail 9573 invoked by uid 500); 23 Jun 2017 23:40:15 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 9563 invoked by uid 99); 23 Jun 2017 23:40:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Jun 2017 23:40:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EB284DFAF5; Fri, 23 Jun 2017 23:40:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: robertwb@apache.org To: commits@beam.apache.org Date: Fri, 23 Jun 2017 23:40:14 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Avoid pickling the entire pipeline per-transform. archived-at: Fri, 23 Jun 2017 23:40:16 -0000 Repository: beam Updated Branches: refs/heads/master 9acce7150 -> a90e40ae9 Avoid pickling the entire pipeline per-transform. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/903da41a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/903da41a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/903da41a Branch: refs/heads/master Commit: 903da41ac5395e76c44ef8ae1c8a695569e23abb Parents: 9acce71 Author: Robert Bradshaw Authored: Fri Jun 23 15:01:42 2017 -0700 Committer: Robert Bradshaw Committed: Fri Jun 23 16:39:51 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline.py | 7 +++++++ sdks/python/apache_beam/pipeline_test.py | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/903da41a/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index d84a2b7..724c87d 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -466,6 +466,13 @@ class Pipeline(object): self.transforms_stack.pop() return pvalueish_result + def __reduce__(self): + # Some transforms contain a reference to their enclosing pipeline, + # which in turn reference all other transforms (resulting in quadratic + # time/space to pickle each transform individually). As we don't + # require pickled pipelines to be executable, break the chain here. + return str, ('Pickled pipeline stub.',) + def _verify_runner_api_compatible(self): class Visitor(PipelineVisitor): # pylint: disable=used-before-assignment ok = True # Really a nonlocal. http://git-wip-us.apache.org/repos/asf/beam/blob/903da41a/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index f9b894f..aad0143 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -480,6 +480,24 @@ class RunnerApiTest(unittest.TestCase): p2 = Pipeline.from_runner_api(proto, p.runner, p._options) p2.run() + def test_pickling(self): + class MyPTransform(beam.PTransform): + pickle_count = [0] + + def expand(self, p): + self.p = p + return p | beam.Create([None]) + + def __reduce__(self): + self.pickle_count[0] += 1 + return str, () + + p = beam.Pipeline() + for k in range(20): + p | 'Iter%s' % k >> MyPTransform() # pylint: disable=expression-not-assigned + p.to_runner_api() + self.assertEqual(MyPTransform.pickle_count[0], 20) + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG)