beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [03/12] incubator-beam git commit: Move names out of transform constructors.
Date Sat, 23 Jul 2016 23:47:09 GMT
Move names out of transform constructors.

sed -i -r 's/[|] (\S+)[(](["'"'"'][^"'"'"']+.)(, +|([)]))/| \2 >> \1(\4/g'

Small number of tests will need to be fixed by hand.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/031c4cce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/031c4cce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/031c4cce

Branch: refs/heads/python-sdk
Commit: 031c4cce0b9eae0d50a49f43ffeced1edbfd2f8f
Parents: 937cf69
Author: Robert Bradshaw <robertwb@google.com>
Authored: Fri Jul 22 14:34:58 2016 -0700
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Sat Jul 23 16:43:45 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/dataflow_test.py        |  96 ++--
 .../examples/complete/autocomplete.py           |   8 +-
 .../examples/complete/autocomplete_test.py      |   4 +-
 .../examples/complete/estimate_pi.py            |   8 +-
 .../examples/complete/estimate_pi_test.py       |   2 +-
 .../complete/juliaset/juliaset/juliaset.py      |   8 +-
 .../apache_beam/examples/complete/tfidf.py      |  32 +-
 .../apache_beam/examples/complete/tfidf_test.py |   2 +-
 .../examples/complete/top_wikipedia_sessions.py |   8 +-
 .../complete/top_wikipedia_sessions_test.py     |   2 +-
 .../examples/cookbook/bigquery_schema.py        |   4 +-
 .../examples/cookbook/bigquery_side_input.py    |   6 +-
 .../cookbook/bigquery_side_input_test.py        |   8 +-
 .../examples/cookbook/bigquery_tornadoes.py     |   6 +-
 .../cookbook/bigquery_tornadoes_test.py         |   2 +-
 .../apache_beam/examples/cookbook/bigshuffle.py |  18 +-
 .../apache_beam/examples/cookbook/coders.py     |   2 +-
 .../examples/cookbook/coders_test.py            |   4 +-
 .../examples/cookbook/custom_ptransform.py      |   6 +-
 .../examples/cookbook/custom_ptransform_test.py |   2 +-
 .../apache_beam/examples/cookbook/filters.py    |  10 +-
 .../examples/cookbook/filters_test.py           |   2 +-
 .../examples/cookbook/group_with_coder.py       |   6 +-
 .../examples/cookbook/mergecontacts.py          |   8 +-
 .../examples/cookbook/multiple_output_pardo.py  |  18 +-
 .../apache_beam/examples/snippets/snippets.py   |  62 +--
 .../examples/snippets/snippets_test.py          |  10 +-
 .../apache_beam/examples/streaming_wordcap.py   |   2 +-
 .../apache_beam/examples/streaming_wordcount.py |   8 +-
 sdks/python/apache_beam/examples/wordcount.py   |  14 +-
 .../apache_beam/examples/wordcount_debugging.py |  16 +-
 .../apache_beam/examples/wordcount_minimal.py   |  14 +-
 sdks/python/apache_beam/io/avroio.py            |   2 +-
 sdks/python/apache_beam/io/bigquery.py          |   4 +-
 .../apache_beam/io/filebasedsource_test.py      |   4 +-
 sdks/python/apache_beam/io/iobase.py            |   4 +-
 sdks/python/apache_beam/pipeline_test.py        |  42 +-
 sdks/python/apache_beam/pvalue_test.py          |   6 +-
 .../consumer_tracking_pipeline_visitor_test.py  |   4 +-
 sdks/python/apache_beam/runners/runner_test.py  |   6 +-
 .../apache_beam/transforms/combiners_test.py    |  48 +-
 sdks/python/apache_beam/transforms/core.py      |  16 +-
 .../python/apache_beam/transforms/ptransform.py |   2 +-
 .../apache_beam/transforms/ptransform_test.py   | 498 +++++++++----------
 sdks/python/apache_beam/transforms/util.py      |   8 +-
 .../apache_beam/transforms/window_test.py       |  14 +-
 .../transforms/write_ptransform_test.py         |   2 +-
 .../typehints/typed_pipeline_test.py            |  16 +-
 48 files changed, 537 insertions(+), 537 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
index 476f8b2..bf66851 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -54,33 +54,33 @@ class DataflowTest(unittest.TestCase):
   def Count(pcoll):  # pylint: disable=invalid-name, no-self-argument
     """A Count transform: v, ... => (v, n), ..."""
     return (pcoll
-            | Map('AddCount', lambda x: (x, 1))
-            | GroupByKey('GroupCounts')
-            | Map('AddCounts', lambda (x, ones): (x, sum(ones))))
+            | 'AddCount' >> Map(lambda x: (x, 1))
+            | 'GroupCounts' >> GroupByKey()
+            | 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones))))
 
   def test_word_count(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    lines = pipeline | Create('SomeWords', DataflowTest.SAMPLE_DATA)
+    lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
     result = (
-        (lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
+        (lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x)))
         .apply('CountWords', DataflowTest.Count))
     assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
     pipeline.run()
 
   def test_map(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    lines = pipeline | Create('input', ['a', 'b', 'c'])
+    lines = pipeline | 'input' >> Create(['a', 'b', 'c'])
     result = (lines
-              | Map('upper', str.upper)
-              | Map('prefix', lambda x, prefix: prefix + x, 'foo-'))
+              | 'upper' >> Map(str.upper)
+              | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-'))
     assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
     pipeline.run()
 
   def test_par_do_with_side_input_as_arg(self):
     pipeline = Pipeline('DirectPipelineRunner')
     words_list = ['aa', 'bb', 'cc']
-    words = pipeline | Create('SomeWords', words_list)
-    prefix = pipeline | Create('SomeString', ['xyz'])  # side in
+    words = pipeline | 'SomeWords' >> Create(words_list)
+    prefix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
     suffix = 'zyx'
     result = words | FlatMap(
         'DecorateWords',
@@ -92,9 +92,9 @@ class DataflowTest(unittest.TestCase):
   def test_par_do_with_side_input_as_keyword_arg(self):
     pipeline = Pipeline('DirectPipelineRunner')
     words_list = ['aa', 'bb', 'cc']
-    words = pipeline | Create('SomeWords', words_list)
+    words = pipeline | 'SomeWords' >> Create(words_list)
     prefix = 'zyx'
-    suffix = pipeline | Create('SomeString', ['xyz'])  # side in
+    suffix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
     result = words | FlatMap(
         'DecorateWords',
         lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
@@ -111,10 +111,10 @@ class DataflowTest(unittest.TestCase):
 
     pipeline = Pipeline('DirectPipelineRunner')
     words_list = ['aa', 'bb', 'cc']
-    words = pipeline | Create('SomeWords', words_list)
+    words = pipeline | 'SomeWords' >> Create(words_list)
     prefix = 'zyx'
-    suffix = pipeline | Create('SomeString', ['xyz'])  # side in
-    result = words | ParDo('DecorateWordsDoFn', SomeDoFn(), prefix,
+    suffix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
+    result = words | 'DecorateWordsDoFn' >> ParDo(SomeDoFn(), prefix,
                            suffix=AsSingleton(suffix))
     assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
     pipeline.run()
@@ -131,7 +131,7 @@ class DataflowTest(unittest.TestCase):
           yield SideOutputValue('odd', context.element)
 
     pipeline = Pipeline('DirectPipelineRunner')
-    nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+    nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
     results = nums | ParDo(
         'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
     assert_that(results.main, equal_to([1, 2, 3, 4]))
@@ -147,7 +147,7 @@ class DataflowTest(unittest.TestCase):
         return [v, SideOutputValue('odd', v)]
 
     pipeline = Pipeline('DirectPipelineRunner')
-    nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+    nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
     results = nums | FlatMap(
         'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
     assert_that(results.main, equal_to([1, 2, 3, 4]))
@@ -157,37 +157,37 @@ class DataflowTest(unittest.TestCase):
 
   def test_empty_singleton_side_input(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcol = pipeline | Create('start', [1, 2])
-    side = pipeline | Create('side', [])  # Empty side input.
+    pcol = pipeline | 'start' >> Create([1, 2])
+    side = pipeline | 'side' >> Create([])  # Empty side input.
 
     def my_fn(k, s):
       v = ('empty' if isinstance(s, EmptySideInput) else 'full')
       return [(k, v)]
-    result = pcol | FlatMap('compute', my_fn, AsSingleton(side))
+    result = pcol | 'compute' >> FlatMap(my_fn, AsSingleton(side))
     assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
     pipeline.run()
 
   def test_multi_valued_singleton_side_input(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcol = pipeline | Create('start', [1, 2])
-    side = pipeline | Create('side', [3, 4])  # 2 values in side input.
-    pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side))  # pylint: disable=expression-not-assigned
+    pcol = pipeline | 'start' >> Create([1, 2])
+    side = pipeline | 'side' >> Create([3, 4])  # 2 values in side input.
+    pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side))  # pylint: disable=expression-not-assigned
     with self.assertRaises(ValueError):
       pipeline.run()
 
   def test_default_value_singleton_side_input(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcol = pipeline | Create('start', [1, 2])
-    side = pipeline | Create('side', [])  # 0 values in side input.
+    pcol = pipeline | 'start' >> Create([1, 2])
+    side = pipeline | 'side' >> Create([])  # 0 values in side input.
     result = (
-        pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side, 10)))
+        pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side, 10)))
     assert_that(result, equal_to([10, 20]))
     pipeline.run()
 
   def test_iterable_side_input(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcol = pipeline | Create('start', [1, 2])
-    side = pipeline | Create('side', [3, 4])  # 2 values in side input.
+    pcol = pipeline | 'start' >> Create([1, 2])
+    side = pipeline | 'side' >> Create([3, 4])  # 2 values in side input.
     result = pcol | FlatMap('compute',
                             lambda x, s: [x * y for y in s], AllOf(side))
     assert_that(result, equal_to([3, 4, 6, 8]))
@@ -195,7 +195,7 @@ class DataflowTest(unittest.TestCase):
 
   def test_undeclared_side_outputs(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+    nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
     results = nums | FlatMap(
         'ClassifyNumbers',
         lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
@@ -210,7 +210,7 @@ class DataflowTest(unittest.TestCase):
 
   def test_empty_side_outputs(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    nums = pipeline | Create('Some Numbers', [1, 3, 5])
+    nums = pipeline | 'Some Numbers' >> Create([1, 3, 5])
     results = nums | FlatMap(
         'ClassifyNumbers',
         lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
@@ -224,9 +224,9 @@ class DataflowTest(unittest.TestCase):
     a_list = [5, 1, 3, 2, 9]
     some_pairs = [('crouton', 17), ('supreme', None)]
     pipeline = Pipeline('DirectPipelineRunner')
-    main_input = pipeline | Create('main input', [1])
-    side_list = pipeline | Create('side list', a_list)
-    side_pairs = pipeline | Create('side pairs', some_pairs)
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
+    side_pairs = pipeline | 'side pairs' >> Create(some_pairs)
     results = main_input | FlatMap(
         'concatenate',
         lambda x, the_list, the_dict: [[x, the_list, the_dict]],
@@ -248,8 +248,8 @@ class DataflowTest(unittest.TestCase):
     # with the same defaults will return the same PCollectionView.
     a_list = [2]
     pipeline = Pipeline('DirectPipelineRunner')
-    main_input = pipeline | Create('main input', [1])
-    side_list = pipeline | Create('side list', a_list)
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
     results = main_input | FlatMap(
         'test',
         lambda x, s1, s2: [[x, s1, s2]],
@@ -271,8 +271,8 @@ class DataflowTest(unittest.TestCase):
     # distinct PCollectionViews with the same full_label.
     a_list = [2]
     pipeline = Pipeline('DirectPipelineRunner')
-    main_input = pipeline | Create('main input', [1])
-    side_list = pipeline | Create('side list', a_list)
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
 
     with self.assertRaises(RuntimeError) as e:
       _ = main_input | FlatMap(
@@ -287,8 +287,8 @@ class DataflowTest(unittest.TestCase):
   def test_as_singleton_with_different_defaults_with_unique_labels(self):
     a_list = []
     pipeline = Pipeline('DirectPipelineRunner')
-    main_input = pipeline | Create('main input', [1])
-    side_list = pipeline | Create('side list', a_list)
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
     results = main_input | FlatMap(
         'test',
         lambda x, s1, s2: [[x, s1, s2]],
@@ -311,8 +311,8 @@ class DataflowTest(unittest.TestCase):
     # return the same PCollectionView.
     a_list = [1, 2, 3]
     pipeline = Pipeline('DirectPipelineRunner')
-    main_input = pipeline | Create('main input', [1])
-    side_list = pipeline | Create('side list', a_list)
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
     results = main_input | FlatMap(
         'test',
         lambda x, ls1, ls2: [[x, ls1, ls2]],
@@ -332,8 +332,8 @@ class DataflowTest(unittest.TestCase):
   def test_as_list_with_unique_labels(self):
     a_list = [1, 2, 3]
     pipeline = Pipeline('DirectPipelineRunner')
-    main_input = pipeline | Create('main input', [1])
-    side_list = pipeline | Create('side list', a_list)
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
     results = main_input | FlatMap(
         'test',
         lambda x, ls1, ls2: [[x, ls1, ls2]],
@@ -353,8 +353,8 @@ class DataflowTest(unittest.TestCase):
   def test_as_dict_with_unique_labels(self):
     some_kvs = [('a', 1), ('b', 2)]
     pipeline = Pipeline('DirectPipelineRunner')
-    main_input = pipeline | Create('main input', [1])
-    side_kvs = pipeline | Create('side kvs', some_kvs)
+    main_input = pipeline | 'main input' >> Create([1])
+    side_kvs = pipeline | 'side kvs' >> Create(some_kvs)
     results = main_input | FlatMap(
         'test',
         lambda x, dct1, dct2: [[x, dct1, dct2]],
@@ -383,10 +383,10 @@ class DataflowTest(unittest.TestCase):
         return existing_windows
 
     pipeline = Pipeline('DirectPipelineRunner')
-    numbers = pipeline | Create('KVs', [(1, 10), (2, 20), (3, 30)])
+    numbers = pipeline | 'KVs' >> Create([(1, 10), (2, 20), (3, 30)])
     result = (numbers
-              | WindowInto('W', windowfn=TestWindowFn())
-              | GroupByKey('G'))
+              | 'W' >> WindowInto(windowfn=TestWindowFn())
+              | 'G' >> GroupByKey())
     assert_that(
         result, equal_to([(1, [10]), (1, [10]), (2, [20]),
                           (2, [20]), (3, [30]), (3, [30])]))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 0f1e96e..b68bc56 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -45,12 +45,12 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
-   | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
-   | beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
-   | TopPerPrefix('TopPerPrefix', 5)
+   | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+   | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+   | 'TopPerPrefix' >> TopPerPrefix(5)
    | beam.Map('format',
               lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
-   | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 84f947b..18d0511 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -31,8 +31,8 @@ class AutocompleteTest(unittest.TestCase):
 
   def test_top_prefixes(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    words = p | beam.Create('create', self.WORDS)
-    result = words | autocomplete.TopPerPrefix('test', 5)
+    words = p | 'create' >> beam.Create(self.WORDS)
+    result = words | 'test' >> autocomplete.TopPerPrefix(5)
     # values must be hashable for now
     result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
     assert_that(result, equal_to(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index 09faecf..ef9f8cc 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -96,9 +96,9 @@ class EstimatePiTransform(beam.PTransform):
   def apply(self, pcoll):
     # A hundred work items of a hundred thousand tries each.
     return (pcoll
-            | beam.Create('Initialize', [100000] * 100).with_output_types(int)
-            | beam.Map('Run trials', run_trials)
-            | beam.CombineGlobally('Sum', combine_results).without_defaults())
+            | 'Initialize' >> beam.Create([100000] * 100).with_output_types(int)
+            | 'Run trials' >> beam.Map(run_trials)
+            | 'Sum' >> beam.CombineGlobally(combine_results).without_defaults())
 
 
 def run(argv=None):
@@ -115,7 +115,7 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
-   | EstimatePiTransform('Estimate')
+   | 'Estimate' >> EstimatePiTransform()
    | beam.io.Write('Write',
                    beam.io.TextFileSink(known_args.output,
                                         coder=JsonCoder())))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index c633bb1..3967ed5 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -39,7 +39,7 @@ class EstimatePiTest(unittest.TestCase):
 
   def test_basics(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    result = p | estimate_pi.EstimatePiTransform('Estimate')
+    result = p | 'Estimate' >> estimate_pi.EstimatePiTransform()
 
     # Note: Probabilistically speaking this test can fail with a probability
     # that is very small (VERY) given that we run at least 10 million trials.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 2bc37e6..56696c3 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -50,7 +50,7 @@ def generate_julia_set_colors(pipeline, c, n, max_iterations):
         yield (x, y)
 
   julia_set_colors = (pipeline
-                      | beam.Create('add points', point_set(n))
+                      | 'add points' >> beam.Create(point_set(n))
                       | beam.Map(
                           get_julia_set_point_color, c, n, max_iterations))
 
@@ -105,11 +105,11 @@ def run(argv=None):  # pylint: disable=missing-docstring
   # Group each coordinate triplet by its x value, then write the coordinates to
   # the output file with an x-coordinate grouping per line.
   # pylint: disable=expression-not-assigned
-  (coordinates | beam.Map('x coord key', lambda (x, y, i): (x, (x, y, i)))
-   | beam.GroupByKey('x coord') | beam.Map(
+  (coordinates | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
+   | 'x coord' >> beam.GroupByKey() | beam.Map(
        'format',
        lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
-   | beam.io.Write('write', beam.io.TextFileSink(known_args.coordinate_output)))
+   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
   # pylint: enable=expression-not-assigned
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index ef58cc0..043d5f6 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -42,7 +42,7 @@ def read_documents(pipeline, uris):
         pipeline
         | beam.io.Read('read: %s' % uri, beam.io.TextFileSource(uri))
         | beam.Map('withkey: %s' % uri, lambda v, uri: (uri, v), uri))
-  return pcolls | beam.Flatten('flatten read pcolls')
+  return pcolls | 'flatten read pcolls' >> beam.Flatten()
 
 
 class TfIdf(beam.PTransform):
@@ -59,9 +59,9 @@ class TfIdf(beam.PTransform):
     # PCollection to use as side input.
     total_documents = (
         uri_to_content
-        | beam.Keys('get uris')
-        | beam.RemoveDuplicates('get unique uris')
-        | beam.combiners.Count.Globally(' count uris'))
+        | 'get uris' >> beam.Keys()
+        | 'get unique uris' >> beam.RemoveDuplicates()
+        | ' count uris' >> beam.combiners.Count.Globally())
 
     # Create a collection of pairs mapping a URI to each of the words
     # in the document associated with that that URI.
@@ -71,28 +71,28 @@ class TfIdf(beam.PTransform):
 
     uri_to_words = (
         uri_to_content
-        | beam.FlatMap('split words', split_into_words))
+        | 'split words' >> beam.FlatMap(split_into_words))
 
     # Compute a mapping from each word to the total number of documents
     # in which it appears.
     word_to_doc_count = (
         uri_to_words
-        | beam.RemoveDuplicates('get unique words per doc')
-        | beam.Values('get words')
-        | beam.combiners.Count.PerElement('count docs per word'))
+        | 'get unique words per doc' >> beam.RemoveDuplicates()
+        | 'get words' >> beam.Values()
+        | 'count docs per word' >> beam.combiners.Count.PerElement())
 
     # Compute a mapping from each URI to the total number of words in the
     # document associated with that URI.
     uri_to_word_total = (
         uri_to_words
-        | beam.Keys(' get uris')
-        | beam.combiners.Count.PerElement('count words in doc'))
+        | ' get uris' >> beam.Keys()
+        | 'count words in doc' >> beam.combiners.Count.PerElement())
 
     # Count, for each (URI, word) pair, the number of occurrences of that word
     # in the document associated with the URI.
     uri_and_word_to_count = (
         uri_to_words
-        | beam.combiners.Count.PerElement('count word-doc pairs'))
+        | 'count word-doc pairs' >> beam.combiners.Count.PerElement())
 
     # Adjust the above collection to a mapping from (URI, word) pairs to counts
     # into an isomorphic mapping from URI to (word, count) pairs, to prepare
@@ -116,7 +116,7 @@ class TfIdf(beam.PTransform):
     #                         ... ]}
     uri_to_word_and_count_and_total = (
         {'word totals': uri_to_word_total, 'word counts': uri_to_word_and_count}
-        | beam.CoGroupByKey('cogroup by uri'))
+        | 'cogroup by uri' >> beam.CoGroupByKey())
 
     # Compute a mapping from each word to a (URI, term frequency) pair for each
     # URI. A word's term frequency for a document is simply the number of times
@@ -132,7 +132,7 @@ class TfIdf(beam.PTransform):
 
     word_to_uri_and_tf = (
         uri_to_word_and_count_and_total
-        | beam.FlatMap('compute term frequencies', compute_term_frequency))
+        | 'compute term frequencies' >> beam.FlatMap(compute_term_frequency))
 
     # Compute a mapping from each word to its document frequency.
     # A word's document frequency in a corpus is the number of
@@ -155,7 +155,7 @@ class TfIdf(beam.PTransform):
     # each keyed on the word.
     word_to_uri_and_tf_and_df = (
         {'tf': word_to_uri_and_tf, 'df': word_to_df}
-        | beam.CoGroupByKey('cogroup words by tf-df'))
+        | 'cogroup words by tf-df' >> beam.CoGroupByKey())
 
     # Compute a mapping from each word to a (URI, TF-IDF) score for each URI.
     # There are a variety of definitions of TF-IDF
@@ -170,7 +170,7 @@ class TfIdf(beam.PTransform):
 
     word_to_uri_and_tfidf = (
         word_to_uri_and_tf_and_df
-        | beam.FlatMap('compute tf-idf', compute_tf_idf))
+        | 'compute tf-idf' >> beam.FlatMap(compute_tf_idf))
 
     return word_to_uri_and_tfidf
 
@@ -197,7 +197,7 @@ def run(argv=None):
   output = pcoll | TfIdf()
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned
-  output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 8f52611..ee7e534 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -56,7 +56,7 @@ class TfIdfTest(unittest.TestCase):
     result = (
         uri_to_line
         | tfidf.TfIdf()
-        | beam.Map('flatten', lambda (word, (uri, tfidf)): (word, uri, tfidf)))
+        | 'flatten' >> beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
     beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS))
     # Run the pipeline. Note that the assert_that above adds to the pipeline
     # a check that the result PCollection contains expected values. To actually

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index c46bfc5..7468484 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -134,9 +134,9 @@ class ComputeTopSessions(beam.PTransform):
             | beam.Filter(lambda x: (abs(hash(x)) <=
                                      MAX_TIMESTAMP * self.sampling_threshold))
             | ComputeSessions()
-            | beam.ParDo('SessionsToStrings', SessionsToStringsDoFn())
+            | 'SessionsToStrings' >> beam.ParDo(SessionsToStringsDoFn())
             | TopPerMonth()
-            | beam.ParDo('FormatOutput', FormatOutputDoFn()))
+            | 'FormatOutput' >> beam.ParDo(FormatOutputDoFn()))
 
 
 def run(argv=None):
@@ -168,9 +168,9 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
-   | beam.Read('read', beam.io.TextFileSource(known_args.input))
+   | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
    | ComputeTopSessions(known_args.sampling_threshold)
-   | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
 
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index fb48641..207d6c4 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -50,7 +50,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
 
   def test_compute_top_sessions(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    edits = p | beam.Create('create', self.EDITS)
+    edits = p | 'create' >> beam.Create(self.EDITS)
     result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
 
     beam.assert_that(result, beam.equal_to(self.EXPECTED))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
index 7c420fb..650a886 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
@@ -111,8 +111,8 @@ def run(argv=None):
            }
 
   # pylint: disable=expression-not-assigned
-  record_ids = p | beam.Create('CreateIDs', ['1', '2', '3', '4', '5'])
-  records = record_ids | beam.Map('CreateRecords', create_random_record)
+  record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5'])
+  records = record_ids | 'CreateRecords' >> beam.Map(create_random_record)
   records | beam.io.Write(
       'write',
       beam.io.BigQuerySink(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index e1d9cf1..1db4a1e 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -105,9 +105,9 @@ def run(argv=None):
                                beam.io.BigQuerySource(query=query_corpus))
   pcoll_word = p | beam.Read('read words',
                              beam.io.BigQuerySource(query=query_word))
-  pcoll_ignore_corpus = p | beam.Create('create_ignore_corpus', [ignore_corpus])
-  pcoll_ignore_word = p | beam.Create('create_ignore_word', [ignore_word])
-  pcoll_group_ids = p | beam.Create('create groups', group_ids)
+  pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create([ignore_corpus])
+  pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word])
+  pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids)
 
   pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word,
                                pcoll_ignore_corpus, pcoll_ignore_word)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index bc75c41..215aafa 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -29,16 +29,16 @@ class BigQuerySideInputTest(unittest.TestCase):
   def test_create_groups(self):
     p = beam.Pipeline('DirectPipelineRunner')
 
-    group_ids_pcoll = p | beam.Create('create_group_ids', ['A', 'B', 'C'])
+    group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C'])
     corpus_pcoll = p | beam.Create('create_corpus',
                                    [{'f': 'corpus1'},
                                     {'f': 'corpus2'},
                                     {'f': 'corpus3'}])
-    words_pcoll = p | beam.Create('create_words', [{'f': 'word1'},
+    words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'},
                                                    {'f': 'word2'},
                                                    {'f': 'word3'}])
-    ignore_corpus_pcoll = p | beam.Create('create_ignore_corpus', ['corpus1'])
-    ignore_word_pcoll = p | beam.Create('create_ignore_word', ['word1'])
+    ignore_corpus_pcoll = p | 'create_ignore_corpus' >> beam.Create(['corpus1'])
+    ignore_word_pcoll = p | 'create_ignore_word' >> beam.Create(['word1'])
 
     groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll,
                                                words_pcoll, ignore_corpus_pcoll,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index e732309..cdaee36 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -56,8 +56,8 @@ def count_tornadoes(input_data):
           | beam.FlatMap(
               'months with tornadoes',
               lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
-          | beam.CombinePerKey('monthly count', sum)
-          | beam.Map('format', lambda (k, v): {'month': k, 'tornado_count': v}))
+          | 'monthly count' >> beam.CombinePerKey(sum)
+          | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v}))
 
 
 def run(argv=None):
@@ -77,7 +77,7 @@ def run(argv=None):
   p = beam.Pipeline(argv=pipeline_args)
 
   # Read the table rows into a PCollection.
-  rows = p | beam.io.Read('read', beam.io.BigQuerySource(known_args.input))
+  rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))
   counts = count_tornadoes(rows)
 
   # Write the output using a "Write" transform that has side effects.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index 2547849..87e1f44 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -28,7 +28,7 @@ class BigQueryTornadoesTest(unittest.TestCase):
 
   def test_basics(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    rows = (p | beam.Create('create', [
+    rows = (p | 'create' >> beam.Create([
         {'month': 1, 'day': 1, 'tornado': False},
         {'month': 1, 'day': 2, 'tornado': True},
         {'month': 1, 'day': 3, 'tornado': True},

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index cde00b3..c29a038 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -59,30 +59,30 @@ def run(argv=None):
 
   # Count the occurrences of each word.
   output = (lines
-            | beam.Map('split', lambda x: (x[:10], x[10:99])
+            | 'split' >> beam.Map(lambda x: (x[:10], x[10:99])
                       ).with_output_types(beam.typehints.KV[str, str])
-            | beam.GroupByKey('group')
+            | 'group' >> beam.GroupByKey()
             | beam.FlatMap(
                 'format',
                 lambda (key, vals): ['%s%s' % (key, val) for val in vals]))
 
   # Write the output using a "Write" transform that has side effects.
-  output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
 
   # Optionally write the input and output checksums.
   if known_args.checksum_output:
     input_csum = (lines
-                  | beam.Map('input-csum', crc32line)
-                  | beam.CombineGlobally('combine-input-csum', sum)
-                  | beam.Map('hex-format', lambda x: '%x' % x))
+                  | 'input-csum' >> beam.Map(crc32line)
+                  | 'combine-input-csum' >> beam.CombineGlobally(sum)
+                  | 'hex-format' >> beam.Map(lambda x: '%x' % x))
     input_csum | beam.io.Write(
         'write-input-csum',
         beam.io.TextFileSink(known_args.checksum_output + '-input'))
 
     output_csum = (output
-                   | beam.Map('output-csum', crc32line)
-                   | beam.CombineGlobally('combine-output-csum', sum)
-                   | beam.Map('hex-format-output', lambda x: '%x' % x))
+                   | 'output-csum' >> beam.Map(crc32line)
+                   | 'combine-output-csum' >> beam.CombineGlobally(sum)
+                   | 'hex-format-output' >> beam.Map(lambda x: '%x' % x))
     output_csum | beam.io.Write(
         'write-output-csum',
         beam.io.TextFileSink(known_args.checksum_output + '-output'))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py
index 1ce1fa5..bbe02b3 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders.py
@@ -89,7 +89,7 @@ def run(argv=None):
   (p  # pylint: disable=expression-not-assigned
    | beam.io.Read('read',
                   beam.io.TextFileSource(known_args.input, coder=JsonCoder()))
-   | beam.FlatMap('points', compute_points)
+   | 'points' >> beam.FlatMap(compute_points)
    | beam.CombinePerKey(sum)
    | beam.io.Write('write',
                    beam.io.TextFileSink(known_args.output, coder=JsonCoder())))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index 5840081..75b78c8 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -35,9 +35,9 @@ class CodersTest(unittest.TestCase):
 
   def test_compute_points(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    records = p | beam.Create('create', self.SAMPLE_RECORDS)
+    records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS)
     result = (records
-              | beam.FlatMap('points', coders.compute_points)
+              | 'points' >> beam.FlatMap(coders.compute_points)
               | beam.CombinePerKey(sum))
     assert_that(result, equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)]))
     p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index d3d8b08..021eff6 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -39,7 +39,7 @@ class Count1(beam.PTransform):
   def apply(self, pcoll):
     return (
         pcoll
-        | beam.Map('Init', lambda v: (v, 1))
+        | 'Init' >> beam.Map(lambda v: (v, 1))
         | beam.CombinePerKey(sum))
 
 
@@ -57,7 +57,7 @@ def Count2(pcoll):  # pylint: disable=invalid-name
   """Count as a decorated function."""
   return (
       pcoll
-      | beam.Map('Init', lambda v: (v, 1))
+      | 'Init' >> beam.Map(lambda v: (v, 1))
       | beam.CombinePerKey(sum))
 
 
@@ -84,7 +84,7 @@ def Count3(pcoll, factor=1):  # pylint: disable=invalid-name
   """
   return (
       pcoll
-      | beam.Map('Init', lambda v: (v, factor))
+      | 'Init' >> beam.Map(lambda v: (v, factor))
       | beam.CombinePerKey(sum))
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 3c0c6f3..603742f 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -40,7 +40,7 @@ class CustomCountTest(unittest.TestCase):
 
   def run_pipeline(self, count_implementation, factor=1):
     p = beam.Pipeline('DirectPipelineRunner')
-    words = p | beam.Create('create', ['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
+    words = p | 'create' >> beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
     result = words | count_implementation
     assert_that(
         result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))]))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index c309941..b19b566 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -59,14 +59,14 @@ def filter_cold_days(input_data, month_filter):
   # Compute the global mean temperature.
   global_mean = AsSingleton(
       fields_of_interest
-      | beam.Map('extract mean', lambda row: row['mean_temp'])
-      | beam.combiners.Mean.Globally('global mean'))
+      | 'extract mean' >> beam.Map(lambda row: row['mean_temp'])
+      | 'global mean' >> beam.combiners.Mean.Globally())
 
   # Filter to the rows representing days in the month of interest
   # in which the mean daily temperature is below the global mean.
   return (
       fields_of_interest
-      | beam.Filter('desired month', lambda row: row['month'] == month_filter)
+      | 'desired month' >> beam.Filter(lambda row: row['month'] == month_filter)
       | beam.Filter('below mean',
                     lambda row, mean: row['mean_temp'] < mean, global_mean))
 
@@ -88,11 +88,11 @@ def run(argv=None):
 
   p = beam.Pipeline(argv=pipeline_args)
 
-  input_data = p | beam.Read('input', beam.io.BigQuerySource(known_args.input))
+  input_data = p | 'input' >> beam.Read(beam.io.BigQuerySource(known_args.input))
 
   # pylint: disable=expression-not-assigned
   (filter_cold_days(input_data, known_args.month_filter)
-   | beam.io.Write('save to BQ', beam.io.BigQuerySink(
+   | 'save to BQ' >> beam.io.Write(beam.io.BigQuerySink(
        known_args.output,
        schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
index cf1ca7e..9e5592f 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -36,7 +36,7 @@ class FiltersTest(unittest.TestCase):
 
   def _get_result_for_month(self, month):
     p = beam.Pipeline('DirectPipelineRunner')
-    rows = (p | beam.Create('create', self.input_data))
+    rows = (p | 'create' >> beam.Create(self.input_data))
 
     results = filters.filter_cold_days(rows, month)
     return results

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 140314e..6c86f61 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -98,19 +98,19 @@ def run(argv=sys.argv[1:]):
   coders.registry.register_coder(Player, PlayerCoder)
 
   (p  # pylint: disable=expression-not-assigned
-   | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+   | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
    # The get_players function is annotated with a type hint above, so the type
    # system knows the output type of the following operation is a key-value pair
    # of a Player and an int. Please see the documentation for details on
    # types that are inferred automatically as well as other ways to specify
    # type hints.
-   | beam.Map('get players', get_players)
+   | 'get players' >> beam.Map(get_players)
    # The output type hint of the previous step is used to infer that the key
    # type of the following operation is the Player type. Since a custom coder
    # is registered for the Player class above, a PlayerCoder will be used to
    # encode Player objects as keys for this combine operation.
    | beam.CombinePerKey(sum) | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
-   | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 9e6b001..bf6d1b1 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -88,7 +88,7 @@ def run(argv=None, assert_results=None):
       known_args.input_snailmail))
 
   # Group together all entries under the same name.
-  grouped = (email, phone, snailmail) | beam.CoGroupByKey('group_by_name')
+  grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey()
 
   # Prepare tab-delimited output; something like this:
   # "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
@@ -107,9 +107,9 @@ def run(argv=None, assert_results=None):
   nomads = grouped | beam.Filter(    # People without addresses.
       lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None))
 
-  num_luddites = luddites | beam.combiners.Count.Globally('luddites')
-  num_writers = writers | beam.combiners.Count.Globally('writers')
-  num_nomads = nomads | beam.combiners.Count.Globally('nomads')
+  num_luddites = luddites | 'luddites' >> beam.combiners.Count.Globally()
+  num_writers = writers | 'writers' >> beam.combiners.Count.Globally()
+  num_nomads = nomads | 'nomads' >> beam.combiners.Count.Globally()
 
   # Write tab-delimited output.
   # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 5bde591..187d20b 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -114,10 +114,10 @@ class CountWords(beam.PTransform):
 
   def apply(self, pcoll):
     return (pcoll
-            | beam.Map('pair_with_one', lambda x: (x, 1))
-            | beam.GroupByKey('group')
-            | beam.Map('count', lambda (word, ones): (word, sum(ones)))
-            | beam.Map('format', lambda (word, c): '%s: %s' % (word, c)))
+            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+            | 'group' >> beam.GroupByKey()
+            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+            | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)))
 
 
 def run(argv=None):
@@ -137,7 +137,7 @@ def run(argv=None):
   pipeline_options.view_as(SetupOptions).save_main_session = True
   p = beam.Pipeline(options=pipeline_options)
 
-  lines = p | beam.Read('read', beam.io.TextFileSource(known_args.input))
+  lines = p | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
 
   # with_outputs allows accessing the side outputs of a DoFn.
   split_lines_result = (lines
@@ -155,21 +155,21 @@ def run(argv=None):
 
   # pylint: disable=expression-not-assigned
   (character_count
-   | beam.Map('pair_with_key', lambda x: ('chars_temp_key', x))
+   | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
    | beam.GroupByKey()
-   | beam.Map('count chars', lambda (_, counts): sum(counts))
+   | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
    | beam.Write('write chars',
                 beam.io.TextFileSink(known_args.output + '-chars')))
 
   # pylint: disable=expression-not-assigned
   (short_words
-   | CountWords('count short words')
+   | 'count short words' >> CountWords()
    | beam.Write('write short words',
                 beam.io.TextFileSink(known_args.output + '-short-words')))
 
   # pylint: disable=expression-not-assigned
   (words
-   | CountWords('count words')
+   | 'count words' >> CountWords()
    | beam.Write('write words',
                 beam.io.TextFileSink(known_args.output + '-words')))
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 3658619..c605db8 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -104,7 +104,7 @@ def construct_pipeline(renames):
   # [END pipelines_constructing_applying]
 
   # [START pipelines_constructing_writing]
-  filtered_words = reversed_words | beam.Filter('FilterWords', filter_words)
+  filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words)
   filtered_words | beam.io.Write('WriteMyFile',
                                  beam.io.TextFileSink(
                                      'gs://some/outputData.txt'))
@@ -242,8 +242,8 @@ def pipeline_options_remote(argv):
   options.view_as(StandardOptions).runner = 'DirectPipelineRunner'
   p = Pipeline(options=options)
 
-  lines = p | beam.io.Read('ReadFromText', beam.io.TextFileSource(my_input))
-  lines | beam.io.Write('WriteToText', beam.io.TextFileSink(my_output))
+  lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
+  lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
 
   p.run()
 
@@ -283,8 +283,8 @@ def pipeline_options_local(argv):
   p = Pipeline(options=options)
   # [END pipeline_options_local]
 
-  lines = p | beam.io.Read('ReadFromText', beam.io.TextFileSource(my_input))
-  lines | beam.io.Write('WriteToText', beam.io.TextFileSink(my_output))
+  lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
+  lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
   p.run()
 
 
@@ -307,7 +307,7 @@ def pipeline_options_command_line(argv):
   p = beam.Pipeline(argv=pipeline_args)
   lines = p | beam.io.Read('ReadFromText',
                            beam.io.TextFileSource(known_args.input))
-  lines | beam.io.Write('WriteToText', beam.io.TextFileSink(known_args.output))
+  lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
   # [END pipeline_options_command_line]
 
   p.run()
@@ -344,8 +344,8 @@ def pipeline_logging(lines, output):
   p = beam.Pipeline(options=PipelineOptions())
   (p
    | beam.Create(lines)
-   | beam.ParDo('ExtractWords', ExtractWordsFn())
-   | beam.io.Write('WriteToText', beam.io.TextFileSink(output)))
+   | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
+   | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(output)))
 
   p.run()
 
@@ -391,11 +391,11 @@ def pipeline_monitoring(renames):
     def apply(self, pcoll):
       return (pcoll
               # Convert lines of text into individual words.
-              | beam.ParDo('ExtractWords', ExtractWordsFn())
+              | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
               # Count the number of times each word occurs.
               | beam.combiners.Count.PerElement()
               # Format each word and count into a printable string.
-              | beam.ParDo('FormatCounts', FormatCountsFn()))
+              | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))
   # [END pipeline_monitoring_composite]
 
   pipeline_options = PipelineOptions()
@@ -405,11 +405,11 @@ def pipeline_monitoring(renames):
   # [START pipeline_monitoring_execution]
   (p
    # Read the lines of the input text.
-   | beam.io.Read('ReadLines', beam.io.TextFileSource(options.input))
+   | 'ReadLines' >> beam.io.Read(beam.io.TextFileSource(options.input))
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
-   | beam.io.Write('WriteCounts', beam.io.TextFileSink(options.output)))
+   | 'WriteCounts' >> beam.io.Write(beam.io.TextFileSink(options.output)))
   # [END pipeline_monitoring_execution]
 
   p.visit(SnippetUtils.RenameFiles(renames))
@@ -454,7 +454,7 @@ def examples_wordcount_minimal(renames):
       # [END examples_wordcount_minimal_read]
 
       # [START examples_wordcount_minimal_pardo]
-      | beam.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+      | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
       # [END examples_wordcount_minimal_pardo]
 
       # [START examples_wordcount_minimal_count]
@@ -466,7 +466,7 @@ def examples_wordcount_minimal(renames):
       # [END examples_wordcount_minimal_map]
 
       # [START examples_wordcount_minimal_write]
-      | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
+      | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink())
       # [END examples_wordcount_minimal_write]
   )
 
@@ -531,7 +531,7 @@ def examples_wordcount_wordcount(renames):
   formatted = counts | beam.ParDo(FormatAsTextFn())
   # [END examples_wordcount_wordcount_dofn]
 
-  formatted | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
+  formatted | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink())
   p.visit(SnippetUtils.RenameFiles(renames))
   p.run()
 
@@ -591,9 +591,9 @@ def examples_wordcount_debugging(renames):
       p
       | beam.io.Read(beam.io.TextFileSource(
           'gs://dataflow-samples/shakespeare/kinglear.txt'))
-      | beam.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+      | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
       | beam.combiners.Count.PerElement()
-      | beam.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
+      | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
 
   # [START example_wordcount_debugging_assert]
   beam.assert_that(
@@ -601,7 +601,7 @@ def examples_wordcount_debugging(renames):
   # [END example_wordcount_debugging_assert]
 
   output = (filtered_words
-            | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
+            | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
             | beam.io.Write(
                 'write', beam.io.TextFileSink('gs://my-bucket/counts.txt')))
 
@@ -682,7 +682,7 @@ def model_custom_source(count):
   # Using the source in an example pipeline.
   # [START model_custom_source_use_new_source]
   p = beam.Pipeline(options=PipelineOptions())
-  numbers = p | beam.io.Read('ProduceNumbers', CountingSource(count))
+  numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count))
   # [END model_custom_source_use_new_source]
 
   lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
@@ -712,7 +712,7 @@ def model_custom_source(count):
 
   # [START model_custom_source_use_ptransform]
   p = beam.Pipeline(options=PipelineOptions())
-  numbers = p | ReadFromCountingSource('ProduceNumbers', count)
+  numbers = p | 'ProduceNumbers' >> ReadFromCountingSource(count)
   # [END model_custom_source_use_ptransform]
 
   lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
@@ -848,7 +848,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
 
   # [START model_custom_sink_use_ptransform]
   p = beam.Pipeline(options=PipelineOptions())
-  kvs = p | beam.core.Create('CreateKVs', KVs)
+  kvs = p | 'CreateKVs' >> beam.core.Create(KVs)
   kvs | WriteToKVSink('WriteToSimpleKV',
                       'http://url_to_simple_kv/', final_table_name)
   # [END model_custom_sink_use_ptransform]
@@ -880,7 +880,7 @@ def model_textio(renames):
   # [END model_textio_read]
 
   # [START model_textio_write]
-  filtered_words = lines | beam.FlatMap('FilterWords', filter_words)
+  filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words)
   # [START model_pipelineio_write]
   filtered_words | beam.io.Write(
       'WriteToText', beam.io.TextFileSink('gs://my_bucket/path/to/numbers',
@@ -1053,7 +1053,7 @@ def model_group_by_key(contents, output_path):
       p
       | beam.Create(contents)
       | beam.FlatMap(lambda x: re.findall(r'\w+', x))
-      | beam.Map('one word', lambda w: (w, 1)))
+      | 'one word' >> beam.Map(lambda w: (w, 1)))
   # GroupByKey accepts a PCollection of (w, 1) and
   # outputs a PCollection of (w, (1, 1, ...)).
   # (A key/value pair is just a tuple in Python.)
@@ -1063,7 +1063,7 @@ def model_group_by_key(contents, output_path):
   grouped_words = words_and_counts | beam.GroupByKey()
   # [END model_group_by_key_transform]
   (grouped_words
-   | beam.Map('count words', lambda (word, counts): (word, len(counts)))
+   | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts)))
    | beam.io.Write(beam.io.TextFileSink(output_path)))
   p.run()
 
@@ -1083,8 +1083,8 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
   # multiple possible values for each key.
   # The phone_list contains values such as: ('mary': '111-222-3333') with
   # multiple possible values for each key.
-  emails = p | beam.Create('email', email_list)
-  phones = p | beam.Create('phone', phone_list)
+  emails = p | 'email' >> beam.Create(email_list)
+  phones = p | 'phone' >> beam.Create(phone_list)
   # The result PCollection contains one key-value element for each key in the
   # input PCollections. The key of the pair will be the key from the input and
   # the value will be a dictionary with two entries: 'emails' - an iterable of
@@ -1119,9 +1119,9 @@ def model_join_using_side_inputs(
   # This code performs a join by receiving the set of names as an input and
   # passing PCollections that contain emails and phone numbers as side inputs
   # instead of using CoGroupByKey.
-  names = p | beam.Create('names', name_list)
-  emails = p | beam.Create('email', email_list)
-  phones = p | beam.Create('phone', phone_list)
+  names = p | 'names' >> beam.Create(name_list)
+  emails = p | 'email' >> beam.Create(email_list)
+  phones = p | 'phone' >> beam.Create(phone_list)
 
   def join_info(name, emails, phone_numbers):
     filtered_emails = []
@@ -1149,7 +1149,7 @@ def model_join_using_side_inputs(
 class Keys(beam.PTransform):
 
   def apply(self, pcoll):
-    return pcoll | beam.Map('Keys', lambda (k, v): k)
+    return pcoll | 'Keys' >> beam.Map(lambda (k, v): k)
 # [END model_library_transforms_keys]
 # pylint: enable=invalid-name
 
@@ -1160,6 +1160,6 @@ class Count(beam.PTransform):
   def apply(self, pcoll):
     return (
         pcoll
-        | beam.Map('Init', lambda v: (v, 1))
+        | 'Init' >> beam.Map(lambda v: (v, 1))
         | beam.CombinePerKey(sum))
 # [END model_library_transforms_count]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 7888263..9eba46a 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -103,14 +103,14 @@ class ParDoTest(unittest.TestCase):
   def test_pardo_with_label(self):
     words = ['aa', 'bbc', 'defg']
     # [START model_pardo_with_label]
-    result = words | beam.Map('CountUniqueLetters', lambda word: len(set(word)))
+    result = words | 'CountUniqueLetters' >> beam.Map(lambda word: len(set(word)))
     # [END model_pardo_with_label]
 
     self.assertEqual({1, 2, 4}, set(result))
 
   def test_pardo_side_input(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    words = p | beam.Create('start', ['a', 'bb', 'ccc', 'dddd'])
+    words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd'])
 
     # [START model_pardo_side_input]
     # Callable takes additional arguments.
@@ -124,11 +124,11 @@ class ParDoTest(unittest.TestCase):
                     | beam.CombineGlobally(beam.combiners.MeanCombineFn()))
 
     # Call with explicit side inputs.
-    small_words = words | beam.FlatMap('small', filter_using_length, 0, 3)
+    small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
 
     # A single deferred side input.
     larger_than_average = (words
-                           | beam.FlatMap('large', filter_using_length,
+                           | 'large' >> beam.FlatMap(filter_using_length,
                                           lower_bound=pvalue.AsSingleton(
                                               avg_word_len)))
 
@@ -268,7 +268,7 @@ class TypeHintsTest(unittest.TestCase):
       evens = numbers | beam.ParDo(FilterEvensDoFn())
       # [END type_hints_do_fn]
 
-    words = p | beam.Create('words', ['a', 'bb', 'c'])
+    words = p | 'words' >> beam.Create(['a', 'bb', 'c'])
     # One can assert outputs and apply them to transforms as well.
     # Helps document the contract and checks it at pipeline construction time.
     # [START type_hints_transform]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
index 7148e58..ef95a5f 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -49,7 +49,7 @@ def run(argv=None):
 
   # Capitalize the characters in each line.
   transformed = (lines
-                 | (beam.Map('capitalize', lambda x: x.upper())))
+                 | 'capitalize' >> (beam.Map(lambda x: x.upper())))
 
   # Write to PubSub.
   # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index eda74dd..35c1abb 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -55,11 +55,11 @@ def run(argv=None):
                  | (beam.FlatMap('split',
                                  lambda x: re.findall(r'[A-Za-z\']+', x))
                     .with_output_types(unicode))
-                 | beam.Map('pair_with_one', lambda x: (x, 1))
+                 | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
                  | beam.WindowInto(window.FixedWindows(15, 0))
-                 | beam.GroupByKey('group')
-                 | beam.Map('count', lambda (word, ones): (word, sum(ones)))
-                 | beam.Map('format', lambda tup: '%s: %d' % tup))
+                 | 'group' >> beam.GroupByKey()
+                 | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+                 | 'format' >> beam.Map(lambda tup: '%s: %d' % tup))
 
   # Write to PubSub.
   # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index bbfd43e..4744352 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -77,22 +77,22 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   # Read the text file[pattern] into a PCollection.
-  lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+  lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
 
   # Count the occurrences of each word.
   counts = (lines
-            | (beam.ParDo('split', WordExtractingDoFn())
+            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                .with_output_types(unicode))
-            | beam.Map('pair_with_one', lambda x: (x, 1))
-            | beam.GroupByKey('group')
-            | beam.Map('count', lambda (word, ones): (word, sum(ones))))
+            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+            | 'group' >> beam.GroupByKey()
+            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
 
   # Format the counts into a PCollection of strings.
-  output = counts | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
+  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned
-  output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
 
   # Actually run the pipeline (all operations above are deferred).
   result = p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 74effed..e008b48 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -97,11 +97,11 @@ class CountWords(beam.PTransform):
 
   def apply(self, pcoll):
     return (pcoll
-            | (beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
+            | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                .with_output_types(unicode))
-            | beam.Map('pair_with_one', lambda x: (x, 1))
-            | beam.GroupByKey('group')
-            | beam.Map('count', lambda (word, ones): (word, sum(ones))))
+            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+            | 'group' >> beam.GroupByKey()
+            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
 
 
 def run(argv=None):
@@ -126,9 +126,9 @@ def run(argv=None):
   # Read the text file[pattern] into a PCollection, count the occurrences of
   # each word and filter by a list of words.
   filtered_words = (
-      p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+      p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
       | CountWords()
-      | beam.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
+      | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
 
   # assert_that is a convenient PTransform that checks a PCollection has an
   # expected value. Asserts are best used in unit tests with small data sets but
@@ -145,8 +145,8 @@ def run(argv=None):
   # "Write" transform that has side effects.
   # pylint: disable=unused-variable
   output = (filtered_words
-            | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
-            | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+            | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+            | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
 
   # Actually run the pipeline (all operations above are deferred).
   p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index c3c41d7..ce5b644 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -93,22 +93,22 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   # Read the text file[pattern] into a PCollection.
-  lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+  lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
 
   # Count the occurrences of each word.
   counts = (lines
-            | (beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
+            | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                .with_output_types(unicode))
-            | beam.Map('pair_with_one', lambda x: (x, 1))
-            | beam.GroupByKey('group')
-            | beam.Map('count', lambda (word, ones): (word, sum(ones))))
+            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+            | 'group' >> beam.GroupByKey()
+            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
 
   # Format the counts into a PCollection of strings.
-  output = counts | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
+  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned
-  output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
 
   # Actually run the pipeline (all operations above are deferred).
   p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 4d1b245..7ad3842 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -43,7 +43,7 @@ class ReadFromAvro(PTransform):
     files, a ``PCollection`` for the records in these Avro files can be created
     in the following manner.
       p = df.Pipeline(argv=pipeline_args)
-      records = p | df.io.ReadFromAvro('Read', '/mypath/myavrofiles*')
+      records = p | 'Read' >> df.io.ReadFromAvro('/mypath/myavrofiles*')
 
     Each record of this ``PCollection`` will contain a single record read from a
     source. Records that are of simple types will be mapped into corresponding

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index f2c56dc..f789312 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -45,8 +45,8 @@ Map transform will get on each call *one* row of the main table and *all* rows
 of the side table. The execution framework may use some caching techniques to
 share the side inputs between calls in order to avoid excessive reading::
 
-  main_table = pipeline | beam.io.Read(beam.io.BigQuerySource('very_big_table')
-  side_table = pipeline | beam.io.Read(beam.io.BigQuerySource('not_big_table')
+  main_table = pipeline | 'very_big_table' >> beam.io.Read(beam.io.BigQuerySource()
+  side_table = pipeline | 'not_big_table' >> beam.io.Read(beam.io.BigQuerySource()
   results = (
       main_table
       | beam.Map('process data',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index c7837ec..1bf51b2 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -255,7 +255,7 @@ class TestFileBasedSource(unittest.TestCase):
     file_name, expected_data = _write_data(100)
     assert len(expected_data) == 100
     pipeline = beam.Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Read('Read', LineSource(file_name))
+    pcoll = pipeline | 'Read' >> beam.Read(LineSource(file_name))
     assert_that(pcoll, equal_to(expected_data))
     pipeline.run()
 
@@ -263,7 +263,7 @@ class TestFileBasedSource(unittest.TestCase):
     pattern, expected_data = _write_pattern([34, 66, 40, 24, 24, 12])
     assert len(expected_data) == 200
     pipeline = beam.Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Read('Read', LineSource(pattern))
+    pcoll = pipeline | 'Read' >> beam.Read(LineSource(pattern))
     assert_that(pcoll, equal_to(expected_data))
     pipeline.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index de5e9d4..b683eb2 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -990,7 +990,7 @@ class Write(ptransform.PTransform):
     from apache_beam.io import iobase
     if isinstance(self.sink, iobase.NativeSink):
       # A native sink
-      return pcoll | _NativeWrite('native_write', self.sink)
+      return pcoll | 'native_write' >> _NativeWrite(self.sink)
     elif isinstance(self.sink, iobase.Sink):
       # A custom sink
       return pcoll | WriteImpl(self.sink)
@@ -1010,7 +1010,7 @@ class WriteImpl(ptransform.PTransform):
     self.sink = sink
 
   def apply(self, pcoll):
-    do_once = pcoll.pipeline | core.Create('DoOnce', [None])
+    do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None])
     init_result_coll = do_once | core.Map(
         'initialize_write', lambda _, sink: sink.initialize_write(), self.sink)
     if getattr(self.sink, 'num_shards', 0):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 86ae45f..39816c0 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -69,7 +69,7 @@ class PipelineTest(unittest.TestCase):
 
   @staticmethod
   def custom_callable(pcoll):
-    return pcoll | FlatMap('+1', lambda x: [x + 1])
+    return pcoll | '+1' >> FlatMap(lambda x: [x + 1])
 
   # Some of these tests designate a runner by name, others supply a runner.
   # This variation is just to verify that both means of runner specification
@@ -78,7 +78,7 @@ class PipelineTest(unittest.TestCase):
   class CustomTransform(PTransform):
 
     def apply(self, pcoll):
-      return pcoll | FlatMap('+1', lambda x: [x + 1])
+      return pcoll | '+1' >> FlatMap(lambda x: [x + 1])
 
   class Visitor(PipelineVisitor):
 
@@ -98,33 +98,33 @@ class PipelineTest(unittest.TestCase):
 
   def test_create(self):
     pipeline = Pipeline(self.runner_name)
-    pcoll = pipeline | Create('label1', [1, 2, 3])
+    pcoll = pipeline | 'label1' >> Create([1, 2, 3])
     assert_that(pcoll, equal_to([1, 2, 3]))
 
     # Test if initial value is an iterator object.
-    pcoll2 = pipeline | Create('label2', iter((4, 5, 6)))
-    pcoll3 = pcoll2 | FlatMap('do', lambda x: [x + 10])
+    pcoll2 = pipeline | 'label2' >> Create(iter((4, 5, 6)))
+    pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10])
     assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3')
     pipeline.run()
 
   def test_create_singleton_pcollection(self):
     pipeline = Pipeline(self.runner_name)
-    pcoll = pipeline | Create('label', [[1, 2, 3]])
+    pcoll = pipeline | 'label' >> Create([[1, 2, 3]])
     assert_that(pcoll, equal_to([[1, 2, 3]]))
     pipeline.run()
 
   def test_read(self):
     pipeline = Pipeline(self.runner_name)
-    pcoll = pipeline | Read('read', FakeSource([1, 2, 3]))
+    pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3]))
     assert_that(pcoll, equal_to([1, 2, 3]))
     pipeline.run()
 
   def test_visit_entire_graph(self):
     pipeline = Pipeline(self.runner_name)
-    pcoll1 = pipeline | Create('pcoll', [1, 2, 3])
-    pcoll2 = pcoll1 | FlatMap('do1', lambda x: [x + 1])
-    pcoll3 = pcoll2 | FlatMap('do2', lambda x: [x + 1])
-    pcoll4 = pcoll2 | FlatMap('do3', lambda x: [x + 1])
+    pcoll1 = pipeline | 'pcoll' >> Create([1, 2, 3])
+    pcoll2 = pcoll1 | 'do1' >> FlatMap(lambda x: [x + 1])
+    pcoll3 = pcoll2 | 'do2' >> FlatMap(lambda x: [x + 1])
+    pcoll4 = pcoll2 | 'do3' >> FlatMap(lambda x: [x + 1])
     transform = PipelineTest.CustomTransform()
     pcoll5 = pcoll4 | transform
 
@@ -140,15 +140,15 @@ class PipelineTest(unittest.TestCase):
 
   def test_apply_custom_transform(self):
     pipeline = Pipeline(self.runner_name)
-    pcoll = pipeline | Create('pcoll', [1, 2, 3])
+    pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])
     result = pcoll | PipelineTest.CustomTransform()
     assert_that(result, equal_to([2, 3, 4]))
     pipeline.run()
 
   def test_reuse_custom_transform_instance(self):
     pipeline = Pipeline(self.runner_name)
-    pcoll1 = pipeline | Create('pcoll1', [1, 2, 3])
-    pcoll2 = pipeline | Create('pcoll2', [4, 5, 6])
+    pcoll1 = pipeline | 'pcoll1' >> Create([1, 2, 3])
+    pcoll2 = pipeline | 'pcoll2' >> Create([4, 5, 6])
     transform = PipelineTest.CustomTransform()
     pcoll1 | transform
     with self.assertRaises(RuntimeError) as cm:
@@ -183,7 +183,7 @@ class PipelineTest(unittest.TestCase):
 
     self.assertEqual(
         ['a-x', 'b-x', 'c-x'],
-        sorted(['a', 'b', 'c'] | AddSuffix('-x')))
+        sorted(['a', 'b', 'c'] | '-x' >> AddSuffix()))
 
   def test_cached_pvalues_are_refcounted(self):
     """Test that cached PValues are refcounted and deleted.
@@ -213,17 +213,17 @@ class PipelineTest(unittest.TestCase):
 
     gc.collect()
     count_threshold = len(gc.get_objects()) + 10000
-    biglist = pipeline | Create('oom:create', ['x'] * num_elements)
+    biglist = pipeline | 'oom:create' >> Create(['x'] * num_elements)
     dupes = (
         biglist
-        | Map('oom:addone', lambda x: (x, 1))
-        | FlatMap('oom:dupes', create_dupes,
+        | 'oom:addone' >> Map(lambda x: (x, 1))
+        | 'oom:dupes' >> FlatMap(create_dupes,
                   AsIter(biglist)).with_outputs('side', main='main'))
     result = (
         (dupes.side, dupes.main, dupes.side)
-        | Flatten('oom:flatten')
-        | CombinePerKey('oom:combine', sum)
-        | Map('oom:check', check_memory, count_threshold))
+        | 'oom:flatten' >> Flatten()
+        | 'oom:combine' >> CombinePerKey(sum)
+        | 'oom:check' >> Map(check_memory, count_threshold))
 
     assert_that(result, equal_to([('x', 3 * num_elements)]))
     pipeline.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/pvalue_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py
index bb742e0..323ca33 100644
--- a/sdks/python/apache_beam/pvalue_test.py
+++ b/sdks/python/apache_beam/pvalue_test.py
@@ -45,9 +45,9 @@ class PValueTest(unittest.TestCase):
 
   def test_pcollectionview_not_recreated(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    value = pipeline | Create('create1', [1, 2, 3])
-    value2 = pipeline | Create('create2', [(1, 1), (2, 2), (3, 3)])
-    value3 = pipeline | Create('create3', [(1, 1), (2, 2), (3, 3)])
+    value = pipeline | 'create1' >> Create([1, 2, 3])
+    value2 = pipeline | 'create2' >> Create([(1, 1), (2, 2), (3, 3)])
+    value3 = pipeline | 'create3' >> Create([(1, 1), (2, 2), (3, 3)])
     self.assertEqual(AsSingleton(value), AsSingleton(value))
     self.assertEqual(AsSingleton('new', value, default_value=1),
                      AsSingleton('new', value, default_value=1))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
index b3b2968..3cd8d73 100644
--- a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
@@ -103,8 +103,8 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
                                pvalue.ListPCollectionView))
 
   def test_co_group_by_key(self):
-    emails = self.pipeline | Create('email', [('joe', 'joe@example.com')])
-    phones = self.pipeline | Create('phone', [('mary', '111-222-3333')])
+    emails = self.pipeline | 'email' >> Create([('joe', 'joe@example.com')])
+    phones = self.pipeline | 'phone' >> Create([('mary', '111-222-3333')])
     {'emails': emails, 'phones': phones} | CoGroupByKey()
 
     self.pipeline.visit(self.visitor)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 2863756..04de7fb 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -60,9 +60,9 @@ class RunnerTest(unittest.TestCase):
                      '--no_auth=True'
                  ]))
 
-    (p | ptransform.Create('create', [1, 2, 3])  # pylint: disable=expression-not-assigned
-     | ptransform.FlatMap('do', lambda x: [(x, x)])
-     | ptransform.GroupByKey('gbk'))
+    (p | 'create' >> ptransform.Create([1, 2, 3])  # pylint: disable=expression-not-assigned
+     | 'do' >> ptransform.FlatMap(lambda x: [(x, x)])
+     | 'gbk' >> ptransform.GroupByKey())
     remote_runner.job = apiclient.Job(p.options)
     super(DataflowPipelineRunner, remote_runner).run(p)
 


Mime
View raw message