Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B1D2C200B5D for ; Sun, 24 Jul 2016 01:47:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B047C160A89; Sat, 23 Jul 2016 23:47:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 449A7160A77 for ; Sun, 24 Jul 2016 01:47:40 +0200 (CEST) Received: (qmail 26606 invoked by uid 500); 23 Jul 2016 23:47:39 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 26597 invoked by uid 99); 23 Jul 2016 23:47:39 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Jul 2016 23:47:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id CA7B8C0472 for ; Sat, 23 Jul 2016 23:47:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id mWvdPcXIy71E for ; Sat, 23 Jul 2016 23:47:14 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 0CED15F3F3 for ; Sat, 23 Jul 2016 23:47:08 +0000 (UTC) Received: (qmail 25649 invoked by uid 99); 23 Jul 2016 23:47:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Jul 2016 23:47:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1053FE95B4; Sat, 23 Jul 2016 23:47:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: robertwb@apache.org To: commits@beam.incubator.apache.org Date: Sat, 23 Jul 2016 23:47:09 -0000 Message-Id: <335508ea1b5a40499f0cb0631b1b2fa2@git.apache.org> In-Reply-To: <55b36bd8ee3f42d2a079469c1be218df@git.apache.org> References: <55b36bd8ee3f42d2a079469c1be218df@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/12] incubator-beam git commit: Move names out of transform constructors. archived-at: Sat, 23 Jul 2016 23:47:42 -0000 Move names out of transform constructors. sed -i -r 's/[|] (\S+)[(](["'"'"'][^"'"'"']+.)(, +|([)]))/| \2 >> \1(\4/g' Small number of tests will need to be fixed by hand. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/031c4cce Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/031c4cce Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/031c4cce Branch: refs/heads/python-sdk Commit: 031c4cce0b9eae0d50a49f43ffeced1edbfd2f8f Parents: 937cf69 Author: Robert Bradshaw Authored: Fri Jul 22 14:34:58 2016 -0700 Committer: Robert Bradshaw Committed: Sat Jul 23 16:43:45 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/dataflow_test.py | 96 ++-- .../examples/complete/autocomplete.py | 8 +- .../examples/complete/autocomplete_test.py | 4 +- .../examples/complete/estimate_pi.py | 8 +- .../examples/complete/estimate_pi_test.py | 2 +- .../complete/juliaset/juliaset/juliaset.py | 8 +- .../apache_beam/examples/complete/tfidf.py | 32 +- .../apache_beam/examples/complete/tfidf_test.py | 2 +- .../examples/complete/top_wikipedia_sessions.py | 8 +- .../complete/top_wikipedia_sessions_test.py | 2 +- .../examples/cookbook/bigquery_schema.py | 4 +- .../examples/cookbook/bigquery_side_input.py | 6 +- .../cookbook/bigquery_side_input_test.py | 8 +- .../examples/cookbook/bigquery_tornadoes.py | 6 +- .../cookbook/bigquery_tornadoes_test.py | 2 +- .../apache_beam/examples/cookbook/bigshuffle.py | 18 +- .../apache_beam/examples/cookbook/coders.py | 2 +- .../examples/cookbook/coders_test.py | 4 +- .../examples/cookbook/custom_ptransform.py | 6 +- .../examples/cookbook/custom_ptransform_test.py | 2 +- .../apache_beam/examples/cookbook/filters.py | 10 +- .../examples/cookbook/filters_test.py | 2 +- .../examples/cookbook/group_with_coder.py | 6 +- .../examples/cookbook/mergecontacts.py | 8 +- .../examples/cookbook/multiple_output_pardo.py | 18 +- .../apache_beam/examples/snippets/snippets.py | 62 +-- .../examples/snippets/snippets_test.py | 10 +- .../apache_beam/examples/streaming_wordcap.py | 2 +- .../apache_beam/examples/streaming_wordcount.py | 8 +- sdks/python/apache_beam/examples/wordcount.py | 14 +- .../apache_beam/examples/wordcount_debugging.py | 16 +- .../apache_beam/examples/wordcount_minimal.py | 14 +- sdks/python/apache_beam/io/avroio.py | 2 +- sdks/python/apache_beam/io/bigquery.py | 4 +- .../apache_beam/io/filebasedsource_test.py | 4 +- sdks/python/apache_beam/io/iobase.py | 4 +- sdks/python/apache_beam/pipeline_test.py | 42 +- sdks/python/apache_beam/pvalue_test.py | 6 +- .../consumer_tracking_pipeline_visitor_test.py | 4 +- sdks/python/apache_beam/runners/runner_test.py | 6 +- .../apache_beam/transforms/combiners_test.py | 48 +- sdks/python/apache_beam/transforms/core.py | 16 +- .../python/apache_beam/transforms/ptransform.py | 2 +- .../apache_beam/transforms/ptransform_test.py | 498 +++++++++---------- sdks/python/apache_beam/transforms/util.py | 8 +- .../apache_beam/transforms/window_test.py | 14 +- .../transforms/write_ptransform_test.py | 2 +- .../typehints/typed_pipeline_test.py | 16 +- 48 files changed, 537 insertions(+), 537 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/dataflow_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py index 476f8b2..bf66851 100644 --- a/sdks/python/apache_beam/dataflow_test.py +++ b/sdks/python/apache_beam/dataflow_test.py @@ -54,33 +54,33 @@ class DataflowTest(unittest.TestCase): def Count(pcoll): # pylint: disable=invalid-name, no-self-argument """A Count transform: v, ... => (v, n), ...""" return (pcoll - | Map('AddCount', lambda x: (x, 1)) - | GroupByKey('GroupCounts') - | Map('AddCounts', lambda (x, ones): (x, sum(ones)))) + | 'AddCount' >> Map(lambda x: (x, 1)) + | 'GroupCounts' >> GroupByKey() + | 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones)))) def test_word_count(self): pipeline = Pipeline('DirectPipelineRunner') - lines = pipeline | Create('SomeWords', DataflowTest.SAMPLE_DATA) + lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA) result = ( - (lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x))) + (lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x))) .apply('CountWords', DataflowTest.Count)) assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT)) pipeline.run() def test_map(self): pipeline = Pipeline('DirectPipelineRunner') - lines = pipeline | Create('input', ['a', 'b', 'c']) + lines = pipeline | 'input' >> Create(['a', 'b', 'c']) result = (lines - | Map('upper', str.upper) - | Map('prefix', lambda x, prefix: prefix + x, 'foo-')) + | 'upper' >> Map(str.upper) + | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-')) assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C'])) pipeline.run() def test_par_do_with_side_input_as_arg(self): pipeline = Pipeline('DirectPipelineRunner') words_list = ['aa', 'bb', 'cc'] - words = pipeline | Create('SomeWords', words_list) - prefix = pipeline | Create('SomeString', ['xyz']) # side in + words = pipeline | 'SomeWords' >> Create(words_list) + prefix = pipeline | 'SomeString' >> Create(['xyz']) # side in suffix = 'zyx' result = words | FlatMap( 'DecorateWords', @@ -92,9 +92,9 @@ class DataflowTest(unittest.TestCase): def test_par_do_with_side_input_as_keyword_arg(self): pipeline = Pipeline('DirectPipelineRunner') words_list = ['aa', 'bb', 'cc'] - words = pipeline | Create('SomeWords', words_list) + words = pipeline | 'SomeWords' >> Create(words_list) prefix = 'zyx' - suffix = pipeline | Create('SomeString', ['xyz']) # side in + suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in result = words | FlatMap( 'DecorateWords', lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)], @@ -111,10 +111,10 @@ class DataflowTest(unittest.TestCase): pipeline = Pipeline('DirectPipelineRunner') words_list = ['aa', 'bb', 'cc'] - words = pipeline | Create('SomeWords', words_list) + words = pipeline | 'SomeWords' >> Create(words_list) prefix = 'zyx' - suffix = pipeline | Create('SomeString', ['xyz']) # side in - result = words | ParDo('DecorateWordsDoFn', SomeDoFn(), prefix, + suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in + result = words | 'DecorateWordsDoFn' >> ParDo(SomeDoFn(), prefix, suffix=AsSingleton(suffix)) assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list])) pipeline.run() @@ -131,7 +131,7 @@ class DataflowTest(unittest.TestCase): yield SideOutputValue('odd', context.element) pipeline = Pipeline('DirectPipelineRunner') - nums = pipeline | Create('Some Numbers', [1, 2, 3, 4]) + nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4]) results = nums | ParDo( 'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main') assert_that(results.main, equal_to([1, 2, 3, 4])) @@ -147,7 +147,7 @@ class DataflowTest(unittest.TestCase): return [v, SideOutputValue('odd', v)] pipeline = Pipeline('DirectPipelineRunner') - nums = pipeline | Create('Some Numbers', [1, 2, 3, 4]) + nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4]) results = nums | FlatMap( 'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main') assert_that(results.main, equal_to([1, 2, 3, 4])) @@ -157,37 +157,37 @@ class DataflowTest(unittest.TestCase): def test_empty_singleton_side_input(self): pipeline = Pipeline('DirectPipelineRunner') - pcol = pipeline | Create('start', [1, 2]) - side = pipeline | Create('side', []) # Empty side input. + pcol = pipeline | 'start' >> Create([1, 2]) + side = pipeline | 'side' >> Create([]) # Empty side input. def my_fn(k, s): v = ('empty' if isinstance(s, EmptySideInput) else 'full') return [(k, v)] - result = pcol | FlatMap('compute', my_fn, AsSingleton(side)) + result = pcol | 'compute' >> FlatMap(my_fn, AsSingleton(side)) assert_that(result, equal_to([(1, 'empty'), (2, 'empty')])) pipeline.run() def test_multi_valued_singleton_side_input(self): pipeline = Pipeline('DirectPipelineRunner') - pcol = pipeline | Create('start', [1, 2]) - side = pipeline | Create('side', [3, 4]) # 2 values in side input. - pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side)) # pylint: disable=expression-not-assigned + pcol = pipeline | 'start' >> Create([1, 2]) + side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input. + pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side)) # pylint: disable=expression-not-assigned with self.assertRaises(ValueError): pipeline.run() def test_default_value_singleton_side_input(self): pipeline = Pipeline('DirectPipelineRunner') - pcol = pipeline | Create('start', [1, 2]) - side = pipeline | Create('side', []) # 0 values in side input. + pcol = pipeline | 'start' >> Create([1, 2]) + side = pipeline | 'side' >> Create([]) # 0 values in side input. result = ( - pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side, 10))) + pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side, 10))) assert_that(result, equal_to([10, 20])) pipeline.run() def test_iterable_side_input(self): pipeline = Pipeline('DirectPipelineRunner') - pcol = pipeline | Create('start', [1, 2]) - side = pipeline | Create('side', [3, 4]) # 2 values in side input. + pcol = pipeline | 'start' >> Create([1, 2]) + side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input. result = pcol | FlatMap('compute', lambda x, s: [x * y for y in s], AllOf(side)) assert_that(result, equal_to([3, 4, 6, 8])) @@ -195,7 +195,7 @@ class DataflowTest(unittest.TestCase): def test_undeclared_side_outputs(self): pipeline = Pipeline('DirectPipelineRunner') - nums = pipeline | Create('Some Numbers', [1, 2, 3, 4]) + nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4]) results = nums | FlatMap( 'ClassifyNumbers', lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)] @@ -210,7 +210,7 @@ class DataflowTest(unittest.TestCase): def test_empty_side_outputs(self): pipeline = Pipeline('DirectPipelineRunner') - nums = pipeline | Create('Some Numbers', [1, 3, 5]) + nums = pipeline | 'Some Numbers' >> Create([1, 3, 5]) results = nums | FlatMap( 'ClassifyNumbers', lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)] @@ -224,9 +224,9 @@ class DataflowTest(unittest.TestCase): a_list = [5, 1, 3, 2, 9] some_pairs = [('crouton', 17), ('supreme', None)] pipeline = Pipeline('DirectPipelineRunner') - main_input = pipeline | Create('main input', [1]) - side_list = pipeline | Create('side list', a_list) - side_pairs = pipeline | Create('side pairs', some_pairs) + main_input = pipeline | 'main input' >> Create([1]) + side_list = pipeline | 'side list' >> Create(a_list) + side_pairs = pipeline | 'side pairs' >> Create(some_pairs) results = main_input | FlatMap( 'concatenate', lambda x, the_list, the_dict: [[x, the_list, the_dict]], @@ -248,8 +248,8 @@ class DataflowTest(unittest.TestCase): # with the same defaults will return the same PCollectionView. a_list = [2] pipeline = Pipeline('DirectPipelineRunner') - main_input = pipeline | Create('main input', [1]) - side_list = pipeline | Create('side list', a_list) + main_input = pipeline | 'main input' >> Create([1]) + side_list = pipeline | 'side list' >> Create(a_list) results = main_input | FlatMap( 'test', lambda x, s1, s2: [[x, s1, s2]], @@ -271,8 +271,8 @@ class DataflowTest(unittest.TestCase): # distinct PCollectionViews with the same full_label. a_list = [2] pipeline = Pipeline('DirectPipelineRunner') - main_input = pipeline | Create('main input', [1]) - side_list = pipeline | Create('side list', a_list) + main_input = pipeline | 'main input' >> Create([1]) + side_list = pipeline | 'side list' >> Create(a_list) with self.assertRaises(RuntimeError) as e: _ = main_input | FlatMap( @@ -287,8 +287,8 @@ class DataflowTest(unittest.TestCase): def test_as_singleton_with_different_defaults_with_unique_labels(self): a_list = [] pipeline = Pipeline('DirectPipelineRunner') - main_input = pipeline | Create('main input', [1]) - side_list = pipeline | Create('side list', a_list) + main_input = pipeline | 'main input' >> Create([1]) + side_list = pipeline | 'side list' >> Create(a_list) results = main_input | FlatMap( 'test', lambda x, s1, s2: [[x, s1, s2]], @@ -311,8 +311,8 @@ class DataflowTest(unittest.TestCase): # return the same PCollectionView. a_list = [1, 2, 3] pipeline = Pipeline('DirectPipelineRunner') - main_input = pipeline | Create('main input', [1]) - side_list = pipeline | Create('side list', a_list) + main_input = pipeline | 'main input' >> Create([1]) + side_list = pipeline | 'side list' >> Create(a_list) results = main_input | FlatMap( 'test', lambda x, ls1, ls2: [[x, ls1, ls2]], @@ -332,8 +332,8 @@ class DataflowTest(unittest.TestCase): def test_as_list_with_unique_labels(self): a_list = [1, 2, 3] pipeline = Pipeline('DirectPipelineRunner') - main_input = pipeline | Create('main input', [1]) - side_list = pipeline | Create('side list', a_list) + main_input = pipeline | 'main input' >> Create([1]) + side_list = pipeline | 'side list' >> Create(a_list) results = main_input | FlatMap( 'test', lambda x, ls1, ls2: [[x, ls1, ls2]], @@ -353,8 +353,8 @@ class DataflowTest(unittest.TestCase): def test_as_dict_with_unique_labels(self): some_kvs = [('a', 1), ('b', 2)] pipeline = Pipeline('DirectPipelineRunner') - main_input = pipeline | Create('main input', [1]) - side_kvs = pipeline | Create('side kvs', some_kvs) + main_input = pipeline | 'main input' >> Create([1]) + side_kvs = pipeline | 'side kvs' >> Create(some_kvs) results = main_input | FlatMap( 'test', lambda x, dct1, dct2: [[x, dct1, dct2]], @@ -383,10 +383,10 @@ class DataflowTest(unittest.TestCase): return existing_windows pipeline = Pipeline('DirectPipelineRunner') - numbers = pipeline | Create('KVs', [(1, 10), (2, 20), (3, 30)]) + numbers = pipeline | 'KVs' >> Create([(1, 10), (2, 20), (3, 30)]) result = (numbers - | WindowInto('W', windowfn=TestWindowFn()) - | GroupByKey('G')) + | 'W' >> WindowInto(windowfn=TestWindowFn()) + | 'G' >> GroupByKey()) assert_that( result, equal_to([(1, [10]), (1, [10]), (2, [20]), (2, [20]), (3, [30]), (3, [30])])) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/autocomplete.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py index 0f1e96e..b68bc56 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete.py @@ -45,12 +45,12 @@ def run(argv=None): p = beam.Pipeline(options=pipeline_options) (p # pylint: disable=expression-not-assigned - | beam.io.Read('read', beam.io.TextFileSource(known_args.input)) - | beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x)) - | TopPerPrefix('TopPerPrefix', 5) + | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input)) + | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + | 'TopPerPrefix' >> TopPerPrefix(5) | beam.Map('format', lambda (prefix, candidates): '%s: %s' % (prefix, candidates)) - | beam.io.Write('write', beam.io.TextFileSink(known_args.output))) + | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))) p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/autocomplete_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py index 84f947b..18d0511 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py @@ -31,8 +31,8 @@ class AutocompleteTest(unittest.TestCase): def test_top_prefixes(self): p = beam.Pipeline('DirectPipelineRunner') - words = p | beam.Create('create', self.WORDS) - result = words | autocomplete.TopPerPrefix('test', 5) + words = p | 'create' >> beam.Create(self.WORDS) + result = words | 'test' >> autocomplete.TopPerPrefix(5) # values must be hashable for now result = result | beam.Map(lambda (k, vs): (k, tuple(vs))) assert_that(result, equal_to( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/estimate_pi.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py index 09faecf..ef9f8cc 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py @@ -96,9 +96,9 @@ class EstimatePiTransform(beam.PTransform): def apply(self, pcoll): # A hundred work items of a hundred thousand tries each. return (pcoll - | beam.Create('Initialize', [100000] * 100).with_output_types(int) - | beam.Map('Run trials', run_trials) - | beam.CombineGlobally('Sum', combine_results).without_defaults()) + | 'Initialize' >> beam.Create([100000] * 100).with_output_types(int) + | 'Run trials' >> beam.Map(run_trials) + | 'Sum' >> beam.CombineGlobally(combine_results).without_defaults()) def run(argv=None): @@ -115,7 +115,7 @@ def run(argv=None): p = beam.Pipeline(options=pipeline_options) (p # pylint: disable=expression-not-assigned - | EstimatePiTransform('Estimate') + | 'Estimate' >> EstimatePiTransform() | beam.io.Write('Write', beam.io.TextFileSink(known_args.output, coder=JsonCoder()))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/estimate_pi_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py index c633bb1..3967ed5 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py @@ -39,7 +39,7 @@ class EstimatePiTest(unittest.TestCase): def test_basics(self): p = beam.Pipeline('DirectPipelineRunner') - result = p | estimate_pi.EstimatePiTransform('Estimate') + result = p | 'Estimate' >> estimate_pi.EstimatePiTransform() # Note: Probabilistically speaking this test can fail with a probability # that is very small (VERY) given that we run at least 10 million trials. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py index 2bc37e6..56696c3 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py @@ -50,7 +50,7 @@ def generate_julia_set_colors(pipeline, c, n, max_iterations): yield (x, y) julia_set_colors = (pipeline - | beam.Create('add points', point_set(n)) + | 'add points' >> beam.Create(point_set(n)) | beam.Map( get_julia_set_point_color, c, n, max_iterations)) @@ -105,11 +105,11 @@ def run(argv=None): # pylint: disable=missing-docstring # Group each coordinate triplet by its x value, then write the coordinates to # the output file with an x-coordinate grouping per line. # pylint: disable=expression-not-assigned - (coordinates | beam.Map('x coord key', lambda (x, y, i): (x, (x, y, i))) - | beam.GroupByKey('x coord') | beam.Map( + (coordinates | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i))) + | 'x coord' >> beam.GroupByKey() | beam.Map( 'format', lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords)) - | beam.io.Write('write', beam.io.TextFileSink(known_args.coordinate_output))) + | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output))) # pylint: enable=expression-not-assigned p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/tfidf.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index ef58cc0..043d5f6 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -42,7 +42,7 @@ def read_documents(pipeline, uris): pipeline | beam.io.Read('read: %s' % uri, beam.io.TextFileSource(uri)) | beam.Map('withkey: %s' % uri, lambda v, uri: (uri, v), uri)) - return pcolls | beam.Flatten('flatten read pcolls') + return pcolls | 'flatten read pcolls' >> beam.Flatten() class TfIdf(beam.PTransform): @@ -59,9 +59,9 @@ class TfIdf(beam.PTransform): # PCollection to use as side input. total_documents = ( uri_to_content - | beam.Keys('get uris') - | beam.RemoveDuplicates('get unique uris') - | beam.combiners.Count.Globally(' count uris')) + | 'get uris' >> beam.Keys() + | 'get unique uris' >> beam.RemoveDuplicates() + | ' count uris' >> beam.combiners.Count.Globally()) # Create a collection of pairs mapping a URI to each of the words # in the document associated with that that URI. @@ -71,28 +71,28 @@ class TfIdf(beam.PTransform): uri_to_words = ( uri_to_content - | beam.FlatMap('split words', split_into_words)) + | 'split words' >> beam.FlatMap(split_into_words)) # Compute a mapping from each word to the total number of documents # in which it appears. word_to_doc_count = ( uri_to_words - | beam.RemoveDuplicates('get unique words per doc') - | beam.Values('get words') - | beam.combiners.Count.PerElement('count docs per word')) + | 'get unique words per doc' >> beam.RemoveDuplicates() + | 'get words' >> beam.Values() + | 'count docs per word' >> beam.combiners.Count.PerElement()) # Compute a mapping from each URI to the total number of words in the # document associated with that URI. uri_to_word_total = ( uri_to_words - | beam.Keys(' get uris') - | beam.combiners.Count.PerElement('count words in doc')) + | ' get uris' >> beam.Keys() + | 'count words in doc' >> beam.combiners.Count.PerElement()) # Count, for each (URI, word) pair, the number of occurrences of that word # in the document associated with the URI. uri_and_word_to_count = ( uri_to_words - | beam.combiners.Count.PerElement('count word-doc pairs')) + | 'count word-doc pairs' >> beam.combiners.Count.PerElement()) # Adjust the above collection to a mapping from (URI, word) pairs to counts # into an isomorphic mapping from URI to (word, count) pairs, to prepare @@ -116,7 +116,7 @@ class TfIdf(beam.PTransform): # ... ]} uri_to_word_and_count_and_total = ( {'word totals': uri_to_word_total, 'word counts': uri_to_word_and_count} - | beam.CoGroupByKey('cogroup by uri')) + | 'cogroup by uri' >> beam.CoGroupByKey()) # Compute a mapping from each word to a (URI, term frequency) pair for each # URI. A word's term frequency for a document is simply the number of times @@ -132,7 +132,7 @@ class TfIdf(beam.PTransform): word_to_uri_and_tf = ( uri_to_word_and_count_and_total - | beam.FlatMap('compute term frequencies', compute_term_frequency)) + | 'compute term frequencies' >> beam.FlatMap(compute_term_frequency)) # Compute a mapping from each word to its document frequency. # A word's document frequency in a corpus is the number of @@ -155,7 +155,7 @@ class TfIdf(beam.PTransform): # each keyed on the word. word_to_uri_and_tf_and_df = ( {'tf': word_to_uri_and_tf, 'df': word_to_df} - | beam.CoGroupByKey('cogroup words by tf-df')) + | 'cogroup words by tf-df' >> beam.CoGroupByKey()) # Compute a mapping from each word to a (URI, TF-IDF) score for each URI. # There are a variety of definitions of TF-IDF @@ -170,7 +170,7 @@ class TfIdf(beam.PTransform): word_to_uri_and_tfidf = ( word_to_uri_and_tf_and_df - | beam.FlatMap('compute tf-idf', compute_tf_idf)) + | 'compute tf-idf' >> beam.FlatMap(compute_tf_idf)) return word_to_uri_and_tfidf @@ -197,7 +197,7 @@ def run(argv=None): output = pcoll | TfIdf() # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned - output | beam.io.Write('write', beam.io.TextFileSink(known_args.output)) + output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)) p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/tfidf_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py index 8f52611..ee7e534 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf_test.py +++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py @@ -56,7 +56,7 @@ class TfIdfTest(unittest.TestCase): result = ( uri_to_line | tfidf.TfIdf() - | beam.Map('flatten', lambda (word, (uri, tfidf)): (word, uri, tfidf))) + | 'flatten' >> beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf))) beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS)) # Run the pipeline. Note that the assert_that above adds to the pipeline # a check that the result PCollection contains expected values. To actually http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index c46bfc5..7468484 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -134,9 +134,9 @@ class ComputeTopSessions(beam.PTransform): | beam.Filter(lambda x: (abs(hash(x)) <= MAX_TIMESTAMP * self.sampling_threshold)) | ComputeSessions() - | beam.ParDo('SessionsToStrings', SessionsToStringsDoFn()) + | 'SessionsToStrings' >> beam.ParDo(SessionsToStringsDoFn()) | TopPerMonth() - | beam.ParDo('FormatOutput', FormatOutputDoFn())) + | 'FormatOutput' >> beam.ParDo(FormatOutputDoFn())) def run(argv=None): @@ -168,9 +168,9 @@ def run(argv=None): p = beam.Pipeline(options=pipeline_options) (p # pylint: disable=expression-not-assigned - | beam.Read('read', beam.io.TextFileSource(known_args.input)) + | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input)) | ComputeTopSessions(known_args.sampling_threshold) - | beam.io.Write('write', beam.io.TextFileSink(known_args.output))) + | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))) p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py index fb48641..207d6c4 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py @@ -50,7 +50,7 @@ class ComputeTopSessionsTest(unittest.TestCase): def test_compute_top_sessions(self): p = beam.Pipeline('DirectPipelineRunner') - edits = p | beam.Create('create', self.EDITS) + edits = p | 'create' >> beam.Create(self.EDITS) result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0) beam.assert_that(result, beam.equal_to(self.EXPECTED)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py index 7c420fb..650a886 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py @@ -111,8 +111,8 @@ def run(argv=None): } # pylint: disable=expression-not-assigned - record_ids = p | beam.Create('CreateIDs', ['1', '2', '3', '4', '5']) - records = record_ids | beam.Map('CreateRecords', create_random_record) + record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5']) + records = record_ids | 'CreateRecords' >> beam.Map(create_random_record) records | beam.io.Write( 'write', beam.io.BigQuerySink( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py index e1d9cf1..1db4a1e 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py @@ -105,9 +105,9 @@ def run(argv=None): beam.io.BigQuerySource(query=query_corpus)) pcoll_word = p | beam.Read('read words', beam.io.BigQuerySource(query=query_word)) - pcoll_ignore_corpus = p | beam.Create('create_ignore_corpus', [ignore_corpus]) - pcoll_ignore_word = p | beam.Create('create_ignore_word', [ignore_word]) - pcoll_group_ids = p | beam.Create('create groups', group_ids) + pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create([ignore_corpus]) + pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word]) + pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids) pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word, pcoll_ignore_corpus, pcoll_ignore_word) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py index bc75c41..215aafa 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py @@ -29,16 +29,16 @@ class BigQuerySideInputTest(unittest.TestCase): def test_create_groups(self): p = beam.Pipeline('DirectPipelineRunner') - group_ids_pcoll = p | beam.Create('create_group_ids', ['A', 'B', 'C']) + group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C']) corpus_pcoll = p | beam.Create('create_corpus', [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}]) - words_pcoll = p | beam.Create('create_words', [{'f': 'word1'}, + words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}]) - ignore_corpus_pcoll = p | beam.Create('create_ignore_corpus', ['corpus1']) - ignore_word_pcoll = p | beam.Create('create_ignore_word', ['word1']) + ignore_corpus_pcoll = p | 'create_ignore_corpus' >> beam.Create(['corpus1']) + ignore_word_pcoll = p | 'create_ignore_word' >> beam.Create(['word1']) groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll, words_pcoll, ignore_corpus_pcoll, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py index e732309..cdaee36 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py @@ -56,8 +56,8 @@ def count_tornadoes(input_data): | beam.FlatMap( 'months with tornadoes', lambda row: [(int(row['month']), 1)] if row['tornado'] else []) - | beam.CombinePerKey('monthly count', sum) - | beam.Map('format', lambda (k, v): {'month': k, 'tornado_count': v})) + | 'monthly count' >> beam.CombinePerKey(sum) + | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v})) def run(argv=None): @@ -77,7 +77,7 @@ def run(argv=None): p = beam.Pipeline(argv=pipeline_args) # Read the table rows into a PCollection. - rows = p | beam.io.Read('read', beam.io.BigQuerySource(known_args.input)) + rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input)) counts = count_tornadoes(rows) # Write the output using a "Write" transform that has side effects. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py index 2547849..87e1f44 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py @@ -28,7 +28,7 @@ class BigQueryTornadoesTest(unittest.TestCase): def test_basics(self): p = beam.Pipeline('DirectPipelineRunner') - rows = (p | beam.Create('create', [ + rows = (p | 'create' >> beam.Create([ {'month': 1, 'day': 1, 'tornado': False}, {'month': 1, 'day': 2, 'tornado': True}, {'month': 1, 'day': 3, 'tornado': True}, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigshuffle.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py index cde00b3..c29a038 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py +++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py @@ -59,30 +59,30 @@ def run(argv=None): # Count the occurrences of each word. output = (lines - | beam.Map('split', lambda x: (x[:10], x[10:99]) + | 'split' >> beam.Map(lambda x: (x[:10], x[10:99]) ).with_output_types(beam.typehints.KV[str, str]) - | beam.GroupByKey('group') + | 'group' >> beam.GroupByKey() | beam.FlatMap( 'format', lambda (key, vals): ['%s%s' % (key, val) for val in vals])) # Write the output using a "Write" transform that has side effects. - output | beam.io.Write('write', beam.io.TextFileSink(known_args.output)) + output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)) # Optionally write the input and output checksums. if known_args.checksum_output: input_csum = (lines - | beam.Map('input-csum', crc32line) - | beam.CombineGlobally('combine-input-csum', sum) - | beam.Map('hex-format', lambda x: '%x' % x)) + | 'input-csum' >> beam.Map(crc32line) + | 'combine-input-csum' >> beam.CombineGlobally(sum) + | 'hex-format' >> beam.Map(lambda x: '%x' % x)) input_csum | beam.io.Write( 'write-input-csum', beam.io.TextFileSink(known_args.checksum_output + '-input')) output_csum = (output - | beam.Map('output-csum', crc32line) - | beam.CombineGlobally('combine-output-csum', sum) - | beam.Map('hex-format-output', lambda x: '%x' % x)) + | 'output-csum' >> beam.Map(crc32line) + | 'combine-output-csum' >> beam.CombineGlobally(sum) + | 'hex-format-output' >> beam.Map(lambda x: '%x' % x)) output_csum | beam.io.Write( 'write-output-csum', beam.io.TextFileSink(known_args.checksum_output + '-output')) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py index 1ce1fa5..bbe02b3 100644 --- a/sdks/python/apache_beam/examples/cookbook/coders.py +++ b/sdks/python/apache_beam/examples/cookbook/coders.py @@ -89,7 +89,7 @@ def run(argv=None): (p # pylint: disable=expression-not-assigned | beam.io.Read('read', beam.io.TextFileSource(known_args.input, coder=JsonCoder())) - | beam.FlatMap('points', compute_points) + | 'points' >> beam.FlatMap(compute_points) | beam.CombinePerKey(sum) | beam.io.Write('write', beam.io.TextFileSink(known_args.output, coder=JsonCoder()))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/coders_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py index 5840081..75b78c8 100644 --- a/sdks/python/apache_beam/examples/cookbook/coders_test.py +++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py @@ -35,9 +35,9 @@ class CodersTest(unittest.TestCase): def test_compute_points(self): p = beam.Pipeline('DirectPipelineRunner') - records = p | beam.Create('create', self.SAMPLE_RECORDS) + records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS) result = (records - | beam.FlatMap('points', coders.compute_points) + | 'points' >> beam.FlatMap(coders.compute_points) | beam.CombinePerKey(sum)) assert_that(result, equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)])) p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py index d3d8b08..021eff6 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py @@ -39,7 +39,7 @@ class Count1(beam.PTransform): def apply(self, pcoll): return ( pcoll - | beam.Map('Init', lambda v: (v, 1)) + | 'Init' >> beam.Map(lambda v: (v, 1)) | beam.CombinePerKey(sum)) @@ -57,7 +57,7 @@ def Count2(pcoll): # pylint: disable=invalid-name """Count as a decorated function.""" return ( pcoll - | beam.Map('Init', lambda v: (v, 1)) + | 'Init' >> beam.Map(lambda v: (v, 1)) | beam.CombinePerKey(sum)) @@ -84,7 +84,7 @@ def Count3(pcoll, factor=1): # pylint: disable=invalid-name """ return ( pcoll - | beam.Map('Init', lambda v: (v, factor)) + | 'Init' >> beam.Map(lambda v: (v, factor)) | beam.CombinePerKey(sum)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py index 3c0c6f3..603742f 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py @@ -40,7 +40,7 @@ class CustomCountTest(unittest.TestCase): def run_pipeline(self, count_implementation, factor=1): p = beam.Pipeline('DirectPipelineRunner') - words = p | beam.Create('create', ['CAT', 'DOG', 'CAT', 'CAT', 'DOG']) + words = p | 'create' >> beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG']) result = words | count_implementation assert_that( result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))])) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/filters.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py index c309941..b19b566 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters.py +++ b/sdks/python/apache_beam/examples/cookbook/filters.py @@ -59,14 +59,14 @@ def filter_cold_days(input_data, month_filter): # Compute the global mean temperature. global_mean = AsSingleton( fields_of_interest - | beam.Map('extract mean', lambda row: row['mean_temp']) - | beam.combiners.Mean.Globally('global mean')) + | 'extract mean' >> beam.Map(lambda row: row['mean_temp']) + | 'global mean' >> beam.combiners.Mean.Globally()) # Filter to the rows representing days in the month of interest # in which the mean daily temperature is below the global mean. return ( fields_of_interest - | beam.Filter('desired month', lambda row: row['month'] == month_filter) + | 'desired month' >> beam.Filter(lambda row: row['month'] == month_filter) | beam.Filter('below mean', lambda row, mean: row['mean_temp'] < mean, global_mean)) @@ -88,11 +88,11 @@ def run(argv=None): p = beam.Pipeline(argv=pipeline_args) - input_data = p | beam.Read('input', beam.io.BigQuerySource(known_args.input)) + input_data = p | 'input' >> beam.Read(beam.io.BigQuerySource(known_args.input)) # pylint: disable=expression-not-assigned (filter_cold_days(input_data, known_args.month_filter) - | beam.io.Write('save to BQ', beam.io.BigQuerySink( + | 'save to BQ' >> beam.io.Write(beam.io.BigQuerySink( known_args.output, schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/filters_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py index cf1ca7e..9e5592f 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters_test.py +++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py @@ -36,7 +36,7 @@ class FiltersTest(unittest.TestCase): def _get_result_for_month(self, month): p = beam.Pipeline('DirectPipelineRunner') - rows = (p | beam.Create('create', self.input_data)) + rows = (p | 'create' >> beam.Create(self.input_data)) results = filters.filter_cold_days(rows, month) return results http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/group_with_coder.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py index 140314e..6c86f61 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py @@ -98,19 +98,19 @@ def run(argv=sys.argv[1:]): coders.registry.register_coder(Player, PlayerCoder) (p # pylint: disable=expression-not-assigned - | beam.io.Read('read', beam.io.TextFileSource(known_args.input)) + | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input)) # The get_players function is annotated with a type hint above, so the type # system knows the output type of the following operation is a key-value pair # of a Player and an int. Please see the documentation for details on # types that are inferred automatically as well as other ways to specify # type hints. - | beam.Map('get players', get_players) + | 'get players' >> beam.Map(get_players) # The output type hint of the previous step is used to infer that the key # type of the following operation is the Player type. Since a custom coder # is registered for the Player class above, a PlayerCoder will be used to # encode Player objects as keys for this combine operation. | beam.CombinePerKey(sum) | beam.Map(lambda (k, v): '%s,%d' % (k.name, v)) - | beam.io.Write('write', beam.io.TextFileSink(known_args.output))) + | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))) p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/mergecontacts.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py index 9e6b001..bf6d1b1 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py @@ -88,7 +88,7 @@ def run(argv=None, assert_results=None): known_args.input_snailmail)) # Group together all entries under the same name. - grouped = (email, phone, snailmail) | beam.CoGroupByKey('group_by_name') + grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey() # Prepare tab-delimited output; something like this: # "name""email_1,email_2""phone""first_snailmail_only" @@ -107,9 +107,9 @@ def run(argv=None, assert_results=None): nomads = grouped | beam.Filter( # People without addresses. lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None)) - num_luddites = luddites | beam.combiners.Count.Globally('luddites') - num_writers = writers | beam.combiners.Count.Globally('writers') - num_nomads = nomads | beam.combiners.Count.Globally('nomads') + num_luddites = luddites | 'luddites' >> beam.combiners.Count.Globally() + num_writers = writers | 'writers' >> beam.combiners.Count.Globally() + num_nomads = nomads | 'nomads' >> beam.combiners.Count.Globally() # Write tab-delimited output. # pylint: disable=expression-not-assigned http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py index 5bde591..187d20b 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py @@ -114,10 +114,10 @@ class CountWords(beam.PTransform): def apply(self, pcoll): return (pcoll - | beam.Map('pair_with_one', lambda x: (x, 1)) - | beam.GroupByKey('group') - | beam.Map('count', lambda (word, ones): (word, sum(ones))) - | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))) + | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) + | 'group' >> beam.GroupByKey() + | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))) + | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))) def run(argv=None): @@ -137,7 +137,7 @@ def run(argv=None): pipeline_options.view_as(SetupOptions).save_main_session = True p = beam.Pipeline(options=pipeline_options) - lines = p | beam.Read('read', beam.io.TextFileSource(known_args.input)) + lines = p | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input)) # with_outputs allows accessing the side outputs of a DoFn. split_lines_result = (lines @@ -155,21 +155,21 @@ def run(argv=None): # pylint: disable=expression-not-assigned (character_count - | beam.Map('pair_with_key', lambda x: ('chars_temp_key', x)) + | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) | beam.GroupByKey() - | beam.Map('count chars', lambda (_, counts): sum(counts)) + | 'count chars' >> beam.Map(lambda (_, counts): sum(counts)) | beam.Write('write chars', beam.io.TextFileSink(known_args.output + '-chars'))) # pylint: disable=expression-not-assigned (short_words - | CountWords('count short words') + | 'count short words' >> CountWords() | beam.Write('write short words', beam.io.TextFileSink(known_args.output + '-short-words'))) # pylint: disable=expression-not-assigned (words - | CountWords('count words') + | 'count words' >> CountWords() | beam.Write('write words', beam.io.TextFileSink(known_args.output + '-words'))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 3658619..c605db8 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -104,7 +104,7 @@ def construct_pipeline(renames): # [END pipelines_constructing_applying] # [START pipelines_constructing_writing] - filtered_words = reversed_words | beam.Filter('FilterWords', filter_words) + filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words) filtered_words | beam.io.Write('WriteMyFile', beam.io.TextFileSink( 'gs://some/outputData.txt')) @@ -242,8 +242,8 @@ def pipeline_options_remote(argv): options.view_as(StandardOptions).runner = 'DirectPipelineRunner' p = Pipeline(options=options) - lines = p | beam.io.Read('ReadFromText', beam.io.TextFileSource(my_input)) - lines | beam.io.Write('WriteToText', beam.io.TextFileSink(my_output)) + lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input)) + lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output)) p.run() @@ -283,8 +283,8 @@ def pipeline_options_local(argv): p = Pipeline(options=options) # [END pipeline_options_local] - lines = p | beam.io.Read('ReadFromText', beam.io.TextFileSource(my_input)) - lines | beam.io.Write('WriteToText', beam.io.TextFileSink(my_output)) + lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input)) + lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output)) p.run() @@ -307,7 +307,7 @@ def pipeline_options_command_line(argv): p = beam.Pipeline(argv=pipeline_args) lines = p | beam.io.Read('ReadFromText', beam.io.TextFileSource(known_args.input)) - lines | beam.io.Write('WriteToText', beam.io.TextFileSink(known_args.output)) + lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(known_args.output)) # [END pipeline_options_command_line] p.run() @@ -344,8 +344,8 @@ def pipeline_logging(lines, output): p = beam.Pipeline(options=PipelineOptions()) (p | beam.Create(lines) - | beam.ParDo('ExtractWords', ExtractWordsFn()) - | beam.io.Write('WriteToText', beam.io.TextFileSink(output))) + | 'ExtractWords' >> beam.ParDo(ExtractWordsFn()) + | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(output))) p.run() @@ -391,11 +391,11 @@ def pipeline_monitoring(renames): def apply(self, pcoll): return (pcoll # Convert lines of text into individual words. - | beam.ParDo('ExtractWords', ExtractWordsFn()) + | 'ExtractWords' >> beam.ParDo(ExtractWordsFn()) # Count the number of times each word occurs. | beam.combiners.Count.PerElement() # Format each word and count into a printable string. - | beam.ParDo('FormatCounts', FormatCountsFn())) + | 'FormatCounts' >> beam.ParDo(FormatCountsFn())) # [END pipeline_monitoring_composite] pipeline_options = PipelineOptions() @@ -405,11 +405,11 @@ def pipeline_monitoring(renames): # [START pipeline_monitoring_execution] (p # Read the lines of the input text. - | beam.io.Read('ReadLines', beam.io.TextFileSource(options.input)) + | 'ReadLines' >> beam.io.Read(beam.io.TextFileSource(options.input)) # Count the words. | CountWords() # Write the formatted word counts to output. - | beam.io.Write('WriteCounts', beam.io.TextFileSink(options.output))) + | 'WriteCounts' >> beam.io.Write(beam.io.TextFileSink(options.output))) # [END pipeline_monitoring_execution] p.visit(SnippetUtils.RenameFiles(renames)) @@ -454,7 +454,7 @@ def examples_wordcount_minimal(renames): # [END examples_wordcount_minimal_read] # [START examples_wordcount_minimal_pardo] - | beam.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x)) + | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) # [END examples_wordcount_minimal_pardo] # [START examples_wordcount_minimal_count] @@ -466,7 +466,7 @@ def examples_wordcount_minimal(renames): # [END examples_wordcount_minimal_map] # [START examples_wordcount_minimal_write] - | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt')) + | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink()) # [END examples_wordcount_minimal_write] ) @@ -531,7 +531,7 @@ def examples_wordcount_wordcount(renames): formatted = counts | beam.ParDo(FormatAsTextFn()) # [END examples_wordcount_wordcount_dofn] - formatted | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt')) + formatted | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink()) p.visit(SnippetUtils.RenameFiles(renames)) p.run() @@ -591,9 +591,9 @@ def examples_wordcount_debugging(renames): p | beam.io.Read(beam.io.TextFileSource( 'gs://dataflow-samples/shakespeare/kinglear.txt')) - | beam.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x)) + | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) | beam.combiners.Count.PerElement() - | beam.ParDo('FilterText', FilterTextFn('Flourish|stomach'))) + | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) # [START example_wordcount_debugging_assert] beam.assert_that( @@ -601,7 +601,7 @@ def examples_wordcount_debugging(renames): # [END example_wordcount_debugging_assert] output = (filtered_words - | beam.Map('format', lambda (word, c): '%s: %s' % (word, c)) + | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) | beam.io.Write( 'write', beam.io.TextFileSink('gs://my-bucket/counts.txt'))) @@ -682,7 +682,7 @@ def model_custom_source(count): # Using the source in an example pipeline. # [START model_custom_source_use_new_source] p = beam.Pipeline(options=PipelineOptions()) - numbers = p | beam.io.Read('ProduceNumbers', CountingSource(count)) + numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count)) # [END model_custom_source_use_new_source] lines = numbers | beam.core.Map(lambda number: 'line %d' % number) @@ -712,7 +712,7 @@ def model_custom_source(count): # [START model_custom_source_use_ptransform] p = beam.Pipeline(options=PipelineOptions()) - numbers = p | ReadFromCountingSource('ProduceNumbers', count) + numbers = p | 'ProduceNumbers' >> ReadFromCountingSource(count) # [END model_custom_source_use_ptransform] lines = numbers | beam.core.Map(lambda number: 'line %d' % number) @@ -848,7 +848,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform, # [START model_custom_sink_use_ptransform] p = beam.Pipeline(options=PipelineOptions()) - kvs = p | beam.core.Create('CreateKVs', KVs) + kvs = p | 'CreateKVs' >> beam.core.Create(KVs) kvs | WriteToKVSink('WriteToSimpleKV', 'http://url_to_simple_kv/', final_table_name) # [END model_custom_sink_use_ptransform] @@ -880,7 +880,7 @@ def model_textio(renames): # [END model_textio_read] # [START model_textio_write] - filtered_words = lines | beam.FlatMap('FilterWords', filter_words) + filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words) # [START model_pipelineio_write] filtered_words | beam.io.Write( 'WriteToText', beam.io.TextFileSink('gs://my_bucket/path/to/numbers', @@ -1053,7 +1053,7 @@ def model_group_by_key(contents, output_path): p | beam.Create(contents) | beam.FlatMap(lambda x: re.findall(r'\w+', x)) - | beam.Map('one word', lambda w: (w, 1))) + | 'one word' >> beam.Map(lambda w: (w, 1))) # GroupByKey accepts a PCollection of (w, 1) and # outputs a PCollection of (w, (1, 1, ...)). # (A key/value pair is just a tuple in Python.) @@ -1063,7 +1063,7 @@ def model_group_by_key(contents, output_path): grouped_words = words_and_counts | beam.GroupByKey() # [END model_group_by_key_transform] (grouped_words - | beam.Map('count words', lambda (word, counts): (word, len(counts))) + | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts))) | beam.io.Write(beam.io.TextFileSink(output_path))) p.run() @@ -1083,8 +1083,8 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path): # multiple possible values for each key. # The phone_list contains values such as: ('mary': '111-222-3333') with # multiple possible values for each key. - emails = p | beam.Create('email', email_list) - phones = p | beam.Create('phone', phone_list) + emails = p | 'email' >> beam.Create(email_list) + phones = p | 'phone' >> beam.Create(phone_list) # The result PCollection contains one key-value element for each key in the # input PCollections. The key of the pair will be the key from the input and # the value will be a dictionary with two entries: 'emails' - an iterable of @@ -1119,9 +1119,9 @@ def model_join_using_side_inputs( # This code performs a join by receiving the set of names as an input and # passing PCollections that contain emails and phone numbers as side inputs # instead of using CoGroupByKey. - names = p | beam.Create('names', name_list) - emails = p | beam.Create('email', email_list) - phones = p | beam.Create('phone', phone_list) + names = p | 'names' >> beam.Create(name_list) + emails = p | 'email' >> beam.Create(email_list) + phones = p | 'phone' >> beam.Create(phone_list) def join_info(name, emails, phone_numbers): filtered_emails = [] @@ -1149,7 +1149,7 @@ def model_join_using_side_inputs( class Keys(beam.PTransform): def apply(self, pcoll): - return pcoll | beam.Map('Keys', lambda (k, v): k) + return pcoll | 'Keys' >> beam.Map(lambda (k, v): k) # [END model_library_transforms_keys] # pylint: enable=invalid-name @@ -1160,6 +1160,6 @@ class Count(beam.PTransform): def apply(self, pcoll): return ( pcoll - | beam.Map('Init', lambda v: (v, 1)) + | 'Init' >> beam.Map(lambda v: (v, 1)) | beam.CombinePerKey(sum)) # [END model_library_transforms_count] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 7888263..9eba46a 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -103,14 +103,14 @@ class ParDoTest(unittest.TestCase): def test_pardo_with_label(self): words = ['aa', 'bbc', 'defg'] # [START model_pardo_with_label] - result = words | beam.Map('CountUniqueLetters', lambda word: len(set(word))) + result = words | 'CountUniqueLetters' >> beam.Map(lambda word: len(set(word))) # [END model_pardo_with_label] self.assertEqual({1, 2, 4}, set(result)) def test_pardo_side_input(self): p = beam.Pipeline('DirectPipelineRunner') - words = p | beam.Create('start', ['a', 'bb', 'ccc', 'dddd']) + words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd']) # [START model_pardo_side_input] # Callable takes additional arguments. @@ -124,11 +124,11 @@ class ParDoTest(unittest.TestCase): | beam.CombineGlobally(beam.combiners.MeanCombineFn())) # Call with explicit side inputs. - small_words = words | beam.FlatMap('small', filter_using_length, 0, 3) + small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3) # A single deferred side input. larger_than_average = (words - | beam.FlatMap('large', filter_using_length, + | 'large' >> beam.FlatMap(filter_using_length, lower_bound=pvalue.AsSingleton( avg_word_len))) @@ -268,7 +268,7 @@ class TypeHintsTest(unittest.TestCase): evens = numbers | beam.ParDo(FilterEvensDoFn()) # [END type_hints_do_fn] - words = p | beam.Create('words', ['a', 'bb', 'c']) + words = p | 'words' >> beam.Create(['a', 'bb', 'c']) # One can assert outputs and apply them to transforms as well. # Helps document the contract and checks it at pipeline construction time. # [START type_hints_transform] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/streaming_wordcap.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py index 7148e58..ef95a5f 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcap.py +++ b/sdks/python/apache_beam/examples/streaming_wordcap.py @@ -49,7 +49,7 @@ def run(argv=None): # Capitalize the characters in each line. transformed = (lines - | (beam.Map('capitalize', lambda x: x.upper()))) + | 'capitalize' >> (beam.Map(lambda x: x.upper()))) # Write to PubSub. # pylint: disable=expression-not-assigned http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/streaming_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index eda74dd..35c1abb 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -55,11 +55,11 @@ def run(argv=None): | (beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x)) .with_output_types(unicode)) - | beam.Map('pair_with_one', lambda x: (x, 1)) + | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | beam.WindowInto(window.FixedWindows(15, 0)) - | beam.GroupByKey('group') - | beam.Map('count', lambda (word, ones): (word, sum(ones))) - | beam.Map('format', lambda tup: '%s: %d' % tup)) + | 'group' >> beam.GroupByKey() + | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))) + | 'format' >> beam.Map(lambda tup: '%s: %d' % tup)) # Write to PubSub. # pylint: disable=expression-not-assigned http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index bbfd43e..4744352 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -77,22 +77,22 @@ def run(argv=None): p = beam.Pipeline(options=pipeline_options) # Read the text file[pattern] into a PCollection. - lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input)) + lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input)) # Count the occurrences of each word. counts = (lines - | (beam.ParDo('split', WordExtractingDoFn()) + | 'split' >> (beam.ParDo(WordExtractingDoFn()) .with_output_types(unicode)) - | beam.Map('pair_with_one', lambda x: (x, 1)) - | beam.GroupByKey('group') - | beam.Map('count', lambda (word, ones): (word, sum(ones)))) + | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) + | 'group' >> beam.GroupByKey() + | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))) # Format the counts into a PCollection of strings. - output = counts | beam.Map('format', lambda (word, c): '%s: %s' % (word, c)) + output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned - output | beam.io.Write('write', beam.io.TextFileSink(known_args.output)) + output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)) # Actually run the pipeline (all operations above are deferred). result = p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount_debugging.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index 74effed..e008b48 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -97,11 +97,11 @@ class CountWords(beam.PTransform): def apply(self, pcoll): return (pcoll - | (beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x)) + | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) .with_output_types(unicode)) - | beam.Map('pair_with_one', lambda x: (x, 1)) - | beam.GroupByKey('group') - | beam.Map('count', lambda (word, ones): (word, sum(ones)))) + | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) + | 'group' >> beam.GroupByKey() + | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))) def run(argv=None): @@ -126,9 +126,9 @@ def run(argv=None): # Read the text file[pattern] into a PCollection, count the occurrences of # each word and filter by a list of words. filtered_words = ( - p | beam.io.Read('read', beam.io.TextFileSource(known_args.input)) + p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input)) | CountWords() - | beam.ParDo('FilterText', FilterTextFn('Flourish|stomach'))) + | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) # assert_that is a convenient PTransform that checks a PCollection has an # expected value. Asserts are best used in unit tests with small data sets but @@ -145,8 +145,8 @@ def run(argv=None): # "Write" transform that has side effects. # pylint: disable=unused-variable output = (filtered_words - | beam.Map('format', lambda (word, c): '%s: %s' % (word, c)) - | beam.io.Write('write', beam.io.TextFileSink(known_args.output))) + | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) + | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))) # Actually run the pipeline (all operations above are deferred). p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount_minimal.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py index c3c41d7..ce5b644 100644 --- a/sdks/python/apache_beam/examples/wordcount_minimal.py +++ b/sdks/python/apache_beam/examples/wordcount_minimal.py @@ -93,22 +93,22 @@ def run(argv=None): p = beam.Pipeline(options=pipeline_options) # Read the text file[pattern] into a PCollection. - lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input)) + lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input)) # Count the occurrences of each word. counts = (lines - | (beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x)) + | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) .with_output_types(unicode)) - | beam.Map('pair_with_one', lambda x: (x, 1)) - | beam.GroupByKey('group') - | beam.Map('count', lambda (word, ones): (word, sum(ones)))) + | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) + | 'group' >> beam.GroupByKey() + | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))) # Format the counts into a PCollection of strings. - output = counts | beam.Map('format', lambda (word, c): '%s: %s' % (word, c)) + output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned - output | beam.io.Write('write', beam.io.TextFileSink(known_args.output)) + output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)) # Actually run the pipeline (all operations above are deferred). p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/avroio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 4d1b245..7ad3842 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -43,7 +43,7 @@ class ReadFromAvro(PTransform): files, a ``PCollection`` for the records in these Avro files can be created in the following manner. p = df.Pipeline(argv=pipeline_args) - records = p | df.io.ReadFromAvro('Read', '/mypath/myavrofiles*') + records = p | 'Read' >> df.io.ReadFromAvro('/mypath/myavrofiles*') Each record of this ``PCollection`` will contain a single record read from a source. Records that are of simple types will be mapped into corresponding http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index f2c56dc..f789312 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -45,8 +45,8 @@ Map transform will get on each call *one* row of the main table and *all* rows of the side table. The execution framework may use some caching techniques to share the side inputs between calls in order to avoid excessive reading:: - main_table = pipeline | beam.io.Read(beam.io.BigQuerySource('very_big_table') - side_table = pipeline | beam.io.Read(beam.io.BigQuerySource('not_big_table') + main_table = pipeline | 'very_big_table' >> beam.io.Read(beam.io.BigQuerySource() + side_table = pipeline | 'not_big_table' >> beam.io.Read(beam.io.BigQuerySource() results = ( main_table | beam.Map('process data', http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/filebasedsource_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index c7837ec..1bf51b2 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -255,7 +255,7 @@ class TestFileBasedSource(unittest.TestCase): file_name, expected_data = _write_data(100) assert len(expected_data) == 100 pipeline = beam.Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Read('Read', LineSource(file_name)) + pcoll = pipeline | 'Read' >> beam.Read(LineSource(file_name)) assert_that(pcoll, equal_to(expected_data)) pipeline.run() @@ -263,7 +263,7 @@ class TestFileBasedSource(unittest.TestCase): pattern, expected_data = _write_pattern([34, 66, 40, 24, 24, 12]) assert len(expected_data) == 200 pipeline = beam.Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Read('Read', LineSource(pattern)) + pcoll = pipeline | 'Read' >> beam.Read(LineSource(pattern)) assert_that(pcoll, equal_to(expected_data)) pipeline.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index de5e9d4..b683eb2 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -990,7 +990,7 @@ class Write(ptransform.PTransform): from apache_beam.io import iobase if isinstance(self.sink, iobase.NativeSink): # A native sink - return pcoll | _NativeWrite('native_write', self.sink) + return pcoll | 'native_write' >> _NativeWrite(self.sink) elif isinstance(self.sink, iobase.Sink): # A custom sink return pcoll | WriteImpl(self.sink) @@ -1010,7 +1010,7 @@ class WriteImpl(ptransform.PTransform): self.sink = sink def apply(self, pcoll): - do_once = pcoll.pipeline | core.Create('DoOnce', [None]) + do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None]) init_result_coll = do_once | core.Map( 'initialize_write', lambda _, sink: sink.initialize_write(), self.sink) if getattr(self.sink, 'num_shards', 0): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 86ae45f..39816c0 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -69,7 +69,7 @@ class PipelineTest(unittest.TestCase): @staticmethod def custom_callable(pcoll): - return pcoll | FlatMap('+1', lambda x: [x + 1]) + return pcoll | '+1' >> FlatMap(lambda x: [x + 1]) # Some of these tests designate a runner by name, others supply a runner. # This variation is just to verify that both means of runner specification @@ -78,7 +78,7 @@ class PipelineTest(unittest.TestCase): class CustomTransform(PTransform): def apply(self, pcoll): - return pcoll | FlatMap('+1', lambda x: [x + 1]) + return pcoll | '+1' >> FlatMap(lambda x: [x + 1]) class Visitor(PipelineVisitor): @@ -98,33 +98,33 @@ class PipelineTest(unittest.TestCase): def test_create(self): pipeline = Pipeline(self.runner_name) - pcoll = pipeline | Create('label1', [1, 2, 3]) + pcoll = pipeline | 'label1' >> Create([1, 2, 3]) assert_that(pcoll, equal_to([1, 2, 3])) # Test if initial value is an iterator object. - pcoll2 = pipeline | Create('label2', iter((4, 5, 6))) - pcoll3 = pcoll2 | FlatMap('do', lambda x: [x + 10]) + pcoll2 = pipeline | 'label2' >> Create(iter((4, 5, 6))) + pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10]) assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3') pipeline.run() def test_create_singleton_pcollection(self): pipeline = Pipeline(self.runner_name) - pcoll = pipeline | Create('label', [[1, 2, 3]]) + pcoll = pipeline | 'label' >> Create([[1, 2, 3]]) assert_that(pcoll, equal_to([[1, 2, 3]])) pipeline.run() def test_read(self): pipeline = Pipeline(self.runner_name) - pcoll = pipeline | Read('read', FakeSource([1, 2, 3])) + pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3])) assert_that(pcoll, equal_to([1, 2, 3])) pipeline.run() def test_visit_entire_graph(self): pipeline = Pipeline(self.runner_name) - pcoll1 = pipeline | Create('pcoll', [1, 2, 3]) - pcoll2 = pcoll1 | FlatMap('do1', lambda x: [x + 1]) - pcoll3 = pcoll2 | FlatMap('do2', lambda x: [x + 1]) - pcoll4 = pcoll2 | FlatMap('do3', lambda x: [x + 1]) + pcoll1 = pipeline | 'pcoll' >> Create([1, 2, 3]) + pcoll2 = pcoll1 | 'do1' >> FlatMap(lambda x: [x + 1]) + pcoll3 = pcoll2 | 'do2' >> FlatMap(lambda x: [x + 1]) + pcoll4 = pcoll2 | 'do3' >> FlatMap(lambda x: [x + 1]) transform = PipelineTest.CustomTransform() pcoll5 = pcoll4 | transform @@ -140,15 +140,15 @@ class PipelineTest(unittest.TestCase): def test_apply_custom_transform(self): pipeline = Pipeline(self.runner_name) - pcoll = pipeline | Create('pcoll', [1, 2, 3]) + pcoll = pipeline | 'pcoll' >> Create([1, 2, 3]) result = pcoll | PipelineTest.CustomTransform() assert_that(result, equal_to([2, 3, 4])) pipeline.run() def test_reuse_custom_transform_instance(self): pipeline = Pipeline(self.runner_name) - pcoll1 = pipeline | Create('pcoll1', [1, 2, 3]) - pcoll2 = pipeline | Create('pcoll2', [4, 5, 6]) + pcoll1 = pipeline | 'pcoll1' >> Create([1, 2, 3]) + pcoll2 = pipeline | 'pcoll2' >> Create([4, 5, 6]) transform = PipelineTest.CustomTransform() pcoll1 | transform with self.assertRaises(RuntimeError) as cm: @@ -183,7 +183,7 @@ class PipelineTest(unittest.TestCase): self.assertEqual( ['a-x', 'b-x', 'c-x'], - sorted(['a', 'b', 'c'] | AddSuffix('-x'))) + sorted(['a', 'b', 'c'] | '-x' >> AddSuffix())) def test_cached_pvalues_are_refcounted(self): """Test that cached PValues are refcounted and deleted. @@ -213,17 +213,17 @@ class PipelineTest(unittest.TestCase): gc.collect() count_threshold = len(gc.get_objects()) + 10000 - biglist = pipeline | Create('oom:create', ['x'] * num_elements) + biglist = pipeline | 'oom:create' >> Create(['x'] * num_elements) dupes = ( biglist - | Map('oom:addone', lambda x: (x, 1)) - | FlatMap('oom:dupes', create_dupes, + | 'oom:addone' >> Map(lambda x: (x, 1)) + | 'oom:dupes' >> FlatMap(create_dupes, AsIter(biglist)).with_outputs('side', main='main')) result = ( (dupes.side, dupes.main, dupes.side) - | Flatten('oom:flatten') - | CombinePerKey('oom:combine', sum) - | Map('oom:check', check_memory, count_threshold)) + | 'oom:flatten' >> Flatten() + | 'oom:combine' >> CombinePerKey(sum) + | 'oom:check' >> Map(check_memory, count_threshold)) assert_that(result, equal_to([('x', 3 * num_elements)])) pipeline.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/pvalue_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py index bb742e0..323ca33 100644 --- a/sdks/python/apache_beam/pvalue_test.py +++ b/sdks/python/apache_beam/pvalue_test.py @@ -45,9 +45,9 @@ class PValueTest(unittest.TestCase): def test_pcollectionview_not_recreated(self): pipeline = Pipeline('DirectPipelineRunner') - value = pipeline | Create('create1', [1, 2, 3]) - value2 = pipeline | Create('create2', [(1, 1), (2, 2), (3, 3)]) - value3 = pipeline | Create('create3', [(1, 1), (2, 2), (3, 3)]) + value = pipeline | 'create1' >> Create([1, 2, 3]) + value2 = pipeline | 'create2' >> Create([(1, 1), (2, 2), (3, 3)]) + value3 = pipeline | 'create3' >> Create([(1, 1), (2, 2), (3, 3)]) self.assertEqual(AsSingleton(value), AsSingleton(value)) self.assertEqual(AsSingleton('new', value, default_value=1), AsSingleton('new', value, default_value=1)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py index b3b2968..3cd8d73 100644 --- a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py +++ b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py @@ -103,8 +103,8 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): pvalue.ListPCollectionView)) def test_co_group_by_key(self): - emails = self.pipeline | Create('email', [('joe', 'joe@example.com')]) - phones = self.pipeline | Create('phone', [('mary', '111-222-3333')]) + emails = self.pipeline | 'email' >> Create([('joe', 'joe@example.com')]) + phones = self.pipeline | 'phone' >> Create([('mary', '111-222-3333')]) {'emails': emails, 'phones': phones} | CoGroupByKey() self.pipeline.visit(self.visitor) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index 2863756..04de7fb 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -60,9 +60,9 @@ class RunnerTest(unittest.TestCase): '--no_auth=True' ])) - (p | ptransform.Create('create', [1, 2, 3]) # pylint: disable=expression-not-assigned - | ptransform.FlatMap('do', lambda x: [(x, x)]) - | ptransform.GroupByKey('gbk')) + (p | 'create' >> ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned + | 'do' >> ptransform.FlatMap(lambda x: [(x, x)]) + | 'gbk' >> ptransform.GroupByKey()) remote_runner.job = apiclient.Job(p.options) super(DataflowPipelineRunner, remote_runner).run(p)