beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [10/12] incubator-beam git commit: Lint fixes.
Date Sat, 23 Jul 2016 23:47:16 GMT
Lint fixes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b15d35ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b15d35ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b15d35ca

Branch: refs/heads/python-sdk
Commit: b15d35ca6e585e75153e05d96403336889cc6894
Parents: 2a59a12
Author: Robert Bradshaw <robertwb@google.com>
Authored: Fri Jul 22 18:35:22 2016 -0700
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/dataflow_test.py        |   7 +-
 .../complete/juliaset/juliaset/juliaset.py      |   9 +-
 .../apache_beam/examples/complete/tfidf_test.py |   2 +-
 .../examples/cookbook/bigquery_side_input.py    |  14 +-
 .../cookbook/bigquery_side_input_test.py        |   4 +-
 .../examples/cookbook/bigquery_tornadoes.py     |   6 +-
 .../apache_beam/examples/cookbook/bigshuffle.py |   5 +-
 .../apache_beam/examples/cookbook/filters.py    |   2 +-
 .../apache_beam/examples/snippets/snippets.py   |   2 +-
 .../examples/snippets/snippets_test.py          |   8 +-
 sdks/python/apache_beam/examples/wordcount.py   |   2 +-
 .../apache_beam/examples/wordcount_debugging.py |   2 +-
 .../apache_beam/examples/wordcount_minimal.py   |   2 +-
 sdks/python/apache_beam/io/bigquery.py          |   4 +-
 sdks/python/apache_beam/pipeline_test.py        |   4 +-
 .../apache_beam/transforms/combiners_test.py    |   4 +-
 sdks/python/apache_beam/transforms/core.py      |   7 +-
 .../apache_beam/transforms/ptransform_test.py   | 161 ++++++++++---------
 sdks/python/apache_beam/transforms/util.py      |   3 +-
 .../typehints/typed_pipeline_test.py            |   2 +-
 20 files changed, 127 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
index bf66851..cc3a526 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -114,8 +114,8 @@ class DataflowTest(unittest.TestCase):
     words = pipeline | 'SomeWords' >> Create(words_list)
     prefix = 'zyx'
     suffix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
-    result = words | 'DecorateWordsDoFn' >> ParDo(SomeDoFn(), prefix,
-                           suffix=AsSingleton(suffix))
+    result = words | 'DecorateWordsDoFn' >> ParDo(
+        SomeDoFn(), prefix, suffix=AsSingleton(suffix))
     assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
     pipeline.run()
 
@@ -179,8 +179,7 @@ class DataflowTest(unittest.TestCase):
     pipeline = Pipeline('DirectPipelineRunner')
     pcol = pipeline | 'start' >> Create([1, 2])
     side = pipeline | 'side' >> Create([])  # 0 values in side input.
-    result = (
-        pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side, 10)))
+    result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10))
     assert_that(result, equal_to([10, 20]))
     pipeline.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 56696c3..1445fbe 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -105,11 +105,12 @@ def run(argv=None):  # pylint: disable=missing-docstring
   # Group each coordinate triplet by its x value, then write the coordinates to
   # the output file with an x-coordinate grouping per line.
   # pylint: disable=expression-not-assigned
-  (coordinates | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
-   | 'x coord' >> beam.GroupByKey() | beam.Map(
-       'format',
+  (coordinates
+   | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
+   | 'x coord' >> beam.GroupByKey()
+   | 'format' >> beam.Map(
        lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
-   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
+   | beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
   # pylint: enable=expression-not-assigned
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index ee7e534..f30b832 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -56,7 +56,7 @@ class TfIdfTest(unittest.TestCase):
     result = (
         uri_to_line
         | tfidf.TfIdf()
-        | 'flatten' >> beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
+        | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
     beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS))
     # Run the pipeline. Note that the assert_that above adds to the pipeline
     # a check that the result PCollection contains expected values. To actually

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index 1db4a1e..2099e48 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -101,11 +101,12 @@ def run(argv=None):
   ignore_corpus = known_args.ignore_corpus
   ignore_word = known_args.ignore_word
 
-  pcoll_corpus = p | beam.Read('read corpus',
-                               beam.io.BigQuerySource(query=query_corpus))
-  pcoll_word = p | beam.Read('read words',
-                             beam.io.BigQuerySource(query=query_word))
-  pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create([ignore_corpus])
+  pcoll_corpus = p | 'read corpus' >> beam.io.Read(
+      beam.io.BigQuerySource(query=query_corpus))
+  pcoll_word = p | 'read_words' >> beam.Read(
+      beam.io.BigQuerySource(query=query_word))
+  pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create(
+      [ignore_corpus])
   pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word])
   pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids)
 
@@ -113,8 +114,7 @@ def run(argv=None):
                                pcoll_ignore_corpus, pcoll_ignore_word)
 
   # pylint:disable=expression-not-assigned
-  pcoll_groups | beam.io.Write('WriteToText',
-                               beam.io.TextFileSink(known_args.output))
+  pcoll_groups | beam.io.Write(beam.io.TextFileSink(known_args.output))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 215aafa..e2b20f3 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -35,8 +35,8 @@ class BigQuerySideInputTest(unittest.TestCase):
                                     {'f': 'corpus2'},
                                     {'f': 'corpus3'}])
     words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'},
-                                                   {'f': 'word2'},
-                                                   {'f': 'word3'}])
+                                                     {'f': 'word2'},
+                                                     {'f': 'word3'}])
     ignore_corpus_pcoll = p | 'create_ignore_corpus' >> beam.Create(['corpus1'])
     ignore_word_pcoll = p | 'create_ignore_word' >> beam.Create(['word1'])
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index cdaee36..6e1326c 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -53,11 +53,11 @@ def count_tornadoes(input_data):
   """
 
   return (input_data
-          | beam.FlatMap(
-              'months with tornadoes',
+          | 'months with tornatoes' >> beam.FlatMap(
               lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
           | 'monthly count' >> beam.CombinePerKey(sum)
-          | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v}))
+          | 'format' >> beam.Map(
+              lambda (k, v): {'month': k, 'tornado_count': v}))
 
 
 def run(argv=None):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index c29a038..f7070dc 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -59,8 +59,9 @@ def run(argv=None):
 
   # Count the occurrences of each word.
   output = (lines
-            | 'split' >> beam.Map(lambda x: (x[:10], x[10:99])
-                      ).with_output_types(beam.typehints.KV[str, str])
+            | 'split' >> beam.Map(
+                lambda x: (x[:10], x[10:99]))
+            .with_output_types(beam.typehints.KV[str, str])
             | 'group' >> beam.GroupByKey()
             | beam.FlatMap(
                 'format',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index b19b566..efd0ba7 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -88,7 +88,7 @@ def run(argv=None):
 
   p = beam.Pipeline(argv=pipeline_args)
 
-  input_data = p | 'input' >> beam.Read(beam.io.BigQuerySource(known_args.input))
+  input_data = p | beam.Read(beam.io.BigQuerySource(known_args.input))
 
   # pylint: disable=expression-not-assigned
   (filter_cold_days(input_data, known_args.month_filter)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 9d1df82..9f3d6e1 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -307,7 +307,7 @@ def pipeline_options_command_line(argv):
   p = beam.Pipeline(argv=pipeline_args)
   lines = p | beam.io.Read('ReadFromText',
                            beam.io.TextFileSource(known_args.input))
-  lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+  lines | beam.io.Write(beam.io.TextFileSink(known_args.output))
   # [END pipeline_options_command_line]
 
   p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 9eba46a..edc0a17 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -101,6 +101,7 @@ class ParDoTest(unittest.TestCase):
     self.assertEqual({'A', 'C'}, set(all_capitals))
 
   def test_pardo_with_label(self):
+    # pylint: disable=line-too-long
     words = ['aa', 'bbc', 'defg']
     # [START model_pardo_with_label]
     result = words | 'CountUniqueLetters' >> beam.Map(lambda word: len(set(word)))
@@ -127,10 +128,9 @@ class ParDoTest(unittest.TestCase):
     small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
 
     # A single deferred side input.
-    larger_than_average = (words
-                           | 'large' >> beam.FlatMap(filter_using_length,
-                                          lower_bound=pvalue.AsSingleton(
-                                              avg_word_len)))
+    larger_than_average = (words | 'large' >> beam.FlatMap(
+        filter_using_length,
+        lower_bound=pvalue.AsSingleton(avg_word_len)))
 
     # Mix and match.
     small_but_nontrivial = words | beam.FlatMap(filter_using_length,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 4744352..096e508 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -82,7 +82,7 @@ def run(argv=None):
   # Count the occurrences of each word.
   counts = (lines
             | 'split' >> (beam.ParDo(WordExtractingDoFn())
-               .with_output_types(unicode))
+                          .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
             | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index e008b48..473a486 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -98,7 +98,7 @@ class CountWords(beam.PTransform):
   def apply(self, pcoll):
     return (pcoll
             | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
-               .with_output_types(unicode))
+                          .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
             | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index ce5b644..4073f7b 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -98,7 +98,7 @@ def run(argv=None):
   # Count the occurrences of each word.
   counts = (lines
             | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
-               .with_output_types(unicode))
+                          .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
             | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index f789312..50c2eaf 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -45,8 +45,8 @@ Map transform will get on each call *one* row of the main table and *all*
rows
 of the side table. The execution framework may use some caching techniques to
 share the side inputs between calls in order to avoid excessive reading::
 
-  main_table = pipeline | 'very_big_table' >> beam.io.Read(beam.io.BigQuerySource()
-  side_table = pipeline | 'not_big_table' >> beam.io.Read(beam.io.BigQuerySource()
+  main_table = pipeline | 'very_big' >> beam.io.Read(beam.io.BigQuerySource()
+  side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource()
   results = (
       main_table
       | beam.Map('process data',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 327f26c..8a0d246 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -217,8 +217,8 @@ class PipelineTest(unittest.TestCase):
     dupes = (
         biglist
         | 'oom:addone' >> Map(lambda x: (x, 1))
-        | 'oom:dupes' >> FlatMap(create_dupes,
-            AsIter(biglist)).with_outputs('side', main='main'))
+        | 'oom:dupes' >> FlatMap(
+            create_dupes, AsIter(biglist)).with_outputs('side', main='main'))
     result = (
         (dupes.side, dupes.main, dupes.side)
         | 'oom:flatten' >> Flatten()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index c970382..bfe168d 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -219,8 +219,8 @@ class CombineTest(unittest.TestCase):
         return main | Map(lambda _, s: s, side)
 
     p = Pipeline('DirectPipelineRunner')
-    result1 = p | 'label1' >> Create([]) | 'L1' >> CombineWithSideInput()
-    result2 = p | 'label2' >> Create([1, 2, 3, 4]) | 'L2' >> CombineWithSideInput()
+    result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput()
+    result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput()
     assert_that(result1, equal_to([0]), label='r1')
     assert_that(result2, equal_to([10]), label='r2')
     p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 44a6d29..5e6aafc 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -814,9 +814,9 @@ class CombineGlobally(PTransform):
         return transform
 
     combined = (pcoll
-                | 'KeyWithVoid' >> add_input_types(Map(lambda v: (None, v))
-                                  .with_output_types(
-                                      KV[None, pcoll.element_type]))
+                | 'KeyWithVoid' >> add_input_types(
+                    Map(lambda v: (None, v)).with_output_types(
+                        KV[None, pcoll.element_type]))
                 | CombinePerKey(
                     'CombinePerKey', self.fn, *self.args, **self.kwargs)
                 | 'UnKey' >> Map(lambda (k, v): v))
@@ -1044,6 +1044,7 @@ class GroupByKey(PTransform):
           KV[key_type, Iterable[typehints.WindowedValue[value_type]]])
       gbk_output_type = KV[key_type, Iterable[value_type]]
 
+      # pylint: disable=bad-continuation
       return (pcoll
               | 'reify_windows' >> (ParDo(self.ReifyWindows())
                  .with_output_types(reify_output_type))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 3a71ec3..992f944 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -288,7 +288,7 @@ class PTransformTest(unittest.TestCase):
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = Pipeline('DirectPipelineRunner')
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
-                                             [('b', x) for x in vals_2]))
+                                               [('b', x) for x in vals_2]))
     result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn())
     assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)),
                                   ('b', sum(vals_2) / len(vals_2))]))
@@ -299,7 +299,7 @@ class PTransformTest(unittest.TestCase):
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = Pipeline('DirectPipelineRunner')
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
-                                             [('b', x) for x in vals_2]))
+                                               [('b', x) for x in vals_2]))
     result = pcoll | beam.CombinePerKey(sum)
     assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))]))
     pipeline.run()
@@ -309,7 +309,7 @@ class PTransformTest(unittest.TestCase):
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = Pipeline('DirectPipelineRunner')
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
-                                             [('b', x) for x in vals_2]))
+                                               [('b', x) for x in vals_2]))
     divisor = pipeline | 'divisor' >> beam.Create([2])
     result = pcoll | beam.CombinePerKey(
         lambda vals, d: max(v for v in vals if v % d == 0),
@@ -513,13 +513,14 @@ class PTransformTest(unittest.TestCase):
   def test_apply_to_list(self):
     self.assertItemsEqual(
         [1, 2, 3], [0, 1, 2] | 'add_one' >> beam.Map(lambda x: x + 1))
-    self.assertItemsEqual([1], [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2))
+    self.assertItemsEqual([1],
+                          [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2))
     self.assertItemsEqual([1, 2, 100, 3],
-                          ([1, 2, 3], [100]) | 'flat' >> beam.Flatten())
+                          ([1, 2, 3], [100]) | beam.Flatten())
     join_input = ([('k', 'a')],
                   [('k', 'b'), ('k', 'c')])
     self.assertItemsEqual([('k', (['a'], ['b', 'c']))],
-                          join_input | 'join' >> beam.CoGroupByKey())
+                          join_input | beam.CoGroupByKey())
 
   def test_multi_input_ptransform(self):
     class DisjointUnion(PTransform):
@@ -776,9 +777,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
-       | 'score' >> (beam.FlatMap(lambda x: [1] if x == 't' else [2])
+       | ('score' >> beam.FlatMap(lambda x: [1] if x == 't' else [2])
           .with_input_types(str).with_output_types(int))
-       | 'upper' >> (beam.FlatMap(lambda x: [x.upper()])
+       | ('upper' >> beam.FlatMap(lambda x: [x.upper()])
           .with_input_types(str).with_output_types(str)))
 
     self.assertEqual("Type hint violation for 'upper': "
@@ -866,7 +867,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_filter_type_checks_using_type_hints_method(self):
     # No error should be raised if this type-checks properly.
     d = (self.p
-         | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
+         | beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
          | 'to int' >> beam.Map(lambda x: int(x))
          .with_input_types(str).with_output_types(int)
          | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
@@ -904,9 +905,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_group_by_key_only_output_type_deduction(self):
     d = (self.p
          | 'str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
-         | 'pair' >> (beam.Map(lambda x: (x, ord(x)))
+         | ('pair' >> beam.Map(lambda x: (x, ord(x)))
             .with_output_types(typehints.KV[str, str]))
-         | 'O' >> beam.GroupByKeyOnly())
+         | beam.GroupByKeyOnly())
 
     # Output type should correctly be deduced.
     # GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -916,9 +917,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_group_by_key_output_type_deduction(self):
     d = (self.p
          | 'str' >> beam.Create(range(20)).with_output_types(int)
-         | 'pair negative' >> (beam.Map(lambda x: (x % 5, -x))
+         | ('pair negative' >> beam.Map(lambda x: (x % 5, -x))
             .with_output_types(typehints.KV[int, int]))
-         | 'T' >> beam.GroupByKey())
+         | beam.GroupByKey())
 
     # Output type should correctly be deduced.
     # GBK should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -929,8 +930,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # GBK will be passed raw int's here instead of some form of KV[A, B].
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 's' >> beam.Create([1, 2, 3]).with_output_types(int)
-       | 'F' >> beam.GroupByKeyOnly())
+       | beam.Create([1, 2, 3]).with_output_types(int)
+       | beam.GroupByKeyOnly())
 
     self.assertEqual("Input type hint violation at F: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -942,9 +943,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # aliased to Tuple[int, str].
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 's' >> (beam.Create(range(5))
+       | (beam.Create(range(5))
           .with_output_types(typehints.Iterable[int]))
-       | 'T' >> beam.GroupByKey())
+       | beam.GroupByKey())
 
     self.assertEqual("Input type hint violation at T: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -973,7 +974,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       (self.p
        | 'nums' >> beam.Create(range(5)).with_output_types(int)
        | 'mod dup' >> beam.Map(lambda x: (x % 2, x))
-       | 'G' >> beam.GroupByKeyOnly())
+       | beam.GroupByKeyOnly())
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
                      'type-hint was found for the PTransform '
@@ -1092,10 +1093,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # ParDo should receive an instance of type 'str', however an 'int' will be
     # passed instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | 'n' >> beam.Create([1, 2, 3])
-       | 'to int' >> (beam.FlatMap(lambda x: [int(x)])
-          .with_input_types(str).with_output_types(int))
-      )
+      (self.p
+       | beam.Create([1, 2, 3])
+       | ('to int' >> beam.FlatMap(lambda x: [int(x)])
+          .with_input_types(str).with_output_types(int)))
       self.p.run()
 
     self.assertStartswith(
@@ -1111,8 +1112,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
-       | 'add' >> (beam.FlatMap(lambda (x, y): [x + y])
+       | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
+       | ('add' >> beam.FlatMap(lambda (x, y): [x + y])
           .with_input_types(typehints.Tuple[int, int]).with_output_types(int))
       )
       self.p.run()
@@ -1136,8 +1137,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         lambda x: [float(x)]).with_input_types(int).with_output_types(
             int).get_type_hints()
     with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | 'n' >> beam.Create([1, 2, 3])
-       | 'to int' >> (beam.FlatMap(lambda x: [float(x)])
+      (self.p
+       | beam.Create([1, 2, 3])
+       | ('to int' >> beam.FlatMap(lambda x: [float(x)])
           .with_input_types(int).with_output_types(int))
       )
       self.p.run()
@@ -1159,8 +1161,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # of 'int' will be generated instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
-       | 'swap' >> (beam.FlatMap(lambda (x, y): [x + y])
+       | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
+       | ('swap' >> beam.FlatMap(lambda (x, y): [x + y])
           .with_input_types(typehints.Tuple[int, float])
           .with_output_types(typehints.Tuple[float, int]))
       )
@@ -1183,7 +1185,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return a + b
 
     with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | 't' >> beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0))
+      (self.p | beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0))
       self.p.run()
 
     self.assertStartswith(
@@ -1199,8 +1201,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 't' >> beam.Create([1, 2, 3, 4])
-       | 'add 1' >> (beam.Map(lambda x, one: x + one, 1.0)
+       | beam.Create([1, 2, 3, 4])
+       | ('add 1' >> beam.Map(lambda x, one: x + one, 1.0)
           .with_input_types(int, int)
           .with_output_types(float)))
       self.p.run()
@@ -1305,8 +1307,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_combine_pipeline_type_check_using_methods(self):
     d = (self.p
-         | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
-         | 'concat' >> (beam.CombineGlobally(lambda s: ''.join(s))
+         | beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | ('concat' >> beam.CombineGlobally(lambda s: ''.join(s))
             .with_input_types(str).with_output_types(str)))
 
     def matcher(expected):
@@ -1321,8 +1323,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 's' >> beam.Create(range(5)).with_output_types(int)
-         | 'sum' >> (beam.CombineGlobally(lambda s: sum(s))
+         | beam.Create(range(5)).with_output_types(int)
+         | ('sum' >> beam.CombineGlobally(lambda s: sum(s))
             .with_input_types(int).with_output_types(int)))
 
     assert_that(d, equal_to([10]))
@@ -1331,8 +1333,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_combine_pipeline_type_check_violation_using_methods(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'e' >> beam.Create(range(3)).with_output_types(int)
-       | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+       | beam.Create(range(3)).with_output_types(int)
+       | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
           .with_input_types(str).with_output_types(str)))
 
     self.assertEqual("Input type hint violation at sort join: "
@@ -1345,8 +1347,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'e' >> beam.Create(range(3)).with_output_types(int)
-       | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+       | beam.Create(range(3)).with_output_types(int)
+       | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
           .with_input_types(str).with_output_types(str)))
       self.p.run()
 
@@ -1427,8 +1429,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_mean_per_key_pipeline_checking_satisfied(self):
     d = (self.p
-         | 'c' >> beam.Create(range(5)).with_output_types(int)
-         | 'even group' >> (beam.Map(lambda x: (not x % 2, x))
+         | beam.Create(range(5)).with_output_types(int)
+         | ('even group' >> beam.Map(lambda x: (not x % 2, x))
             .with_output_types(typehints.KV[bool, int]))
          | 'even mean' >> combine.Mean.PerKey())
 
@@ -1439,8 +1441,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_mean_per_key_pipeline_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'e' >> beam.Create(map(str, range(5))).with_output_types(str)
-       | 'upper pair' >> (beam.Map(lambda x: (x.upper(), x))
+       | beam.Create(map(str, range(5))).with_output_types(str)
+       | ('upper pair' >> beam.Map(lambda x: (x.upper(), x))
           .with_output_types(typehints.KV[str, str]))
        | 'even mean' >> combine.Mean.PerKey())
       self.p.run()
@@ -1455,8 +1457,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'c' >> beam.Create(range(5)).with_output_types(int)
-         | 'odd group' >> (beam.Map(lambda x: (bool(x % 2), x))
+         | beam.Create(range(5)).with_output_types(int)
+         | ('odd group' >> beam.Map(lambda x: (bool(x % 2), x))
             .with_output_types(typehints.KV[bool, int]))
          | 'odd mean' >> combine.Mean.PerKey())
 
@@ -1470,8 +1472,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'c' >> beam.Create(range(5)).with_output_types(int)
-       | 'odd group' >> (beam.Map(lambda x: (x, str(bool(x % 2))))
+       | beam.Create(range(5)).with_output_types(int)
+       | ('odd group' >> beam.Map(lambda x: (x, str(bool(x % 2))))
           .with_output_types(typehints.KV[int, str]))
        | 'odd mean' >> combine.Mean.PerKey())
       self.p.run()
@@ -1514,8 +1516,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_count_perkey_pipeline_type_checking_satisfied(self):
     d = (self.p
-         | 'p' >> beam.Create(range(5)).with_output_types(int)
-         | 'even group' >> (beam.Map(lambda x: (not x % 2, x))
+         | beam.Create(range(5)).with_output_types(int)
+         | ('even group' >> beam.Map(lambda x: (not x % 2, x))
             .with_output_types(typehints.KV[bool, int]))
          | 'count int' >> combine.Count.PerKey())
 
@@ -1526,7 +1528,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_count_perkey_pipeline_type_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'p' >> beam.Create(range(5)).with_output_types(int)
+       | beam.Create(range(5)).with_output_types(int)
        | 'count int' >> combine.Count.PerKey())
 
     self.assertEqual("Input type hint violation at GroupByKey: "
@@ -1538,7 +1540,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | beam.Create(['t', 'e', 's', 't']).with_output_types(str)
          | 'dup key' >> beam.Map(lambda x: (x, x))
          .with_output_types(typehints.KV[str, str])
          | 'count dups' >> combine.Count.PerKey())
@@ -1549,7 +1551,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_count_perelement_pipeline_type_checking_satisfied(self):
     d = (self.p
-         | 'w' >> beam.Create([1, 1, 2, 3]).with_output_types(int)
+         | beam.Create([1, 1, 2, 3]).with_output_types(int)
          | 'count elems' >> combine.Count.PerElement())
 
     self.assertCompatible(typehints.KV[int, int], d.element_type)
@@ -1561,7 +1563,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'f' >> beam.Create([1, 1, 2, 3])
+       | beam.Create([1, 1, 2, 3])
        | 'count elems' >> combine.Count.PerElement())
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
@@ -1573,7 +1575,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'w' >> beam.Create([True, True, False, True, True])
+         | beam.Create([True, True, False, True, True])
          .with_output_types(bool)
          | 'count elems' >> combine.Count.PerElement())
 
@@ -1583,7 +1585,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_top_of_pipeline_checking_satisfied(self):
     d = (self.p
-         | 'n' >> beam.Create(range(5, 11)).with_output_types(int)
+         | beam.Create(range(5, 11)).with_output_types(int)
          | 'top 3' >> combine.Top.Of(3, lambda x, y: x < y))
 
     self.assertCompatible(typehints.Iterable[int],
@@ -1595,7 +1597,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'n' >> beam.Create(list('testing')).with_output_types(str)
+         | beam.Create(list('testing')).with_output_types(str)
          | 'acii top' >> combine.Top.Of(3, lambda x, y: x < y))
 
     self.assertCompatible(typehints.Iterable[str], d.element_type)
@@ -1605,7 +1607,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_per_key_pipeline_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'n' >> beam.Create(range(100)).with_output_types(int)
+       | beam.Create(range(100)).with_output_types(int)
        | 'num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
        | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
@@ -1616,8 +1618,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_per_key_pipeline_checking_satisfied(self):
     d = (self.p
-         | 'n' >> beam.Create(range(100)).with_output_types(int)
-         | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x))
+         | beam.Create(range(100)).with_output_types(int)
+         | ('group mod 3' >> beam.Map(lambda x: (x % 3, x))
             .with_output_types(typehints.KV[int, int]))
          | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
@@ -1630,8 +1632,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'n' >> beam.Create(range(21))
-         | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x))
+         | beam.Create(range(21))
+         | ('group mod 3' >> beam.Map(lambda x: (x % 3, x))
             .with_output_types(typehints.KV[int, int]))
          | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
@@ -1642,7 +1644,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_sample_globally_pipeline_satisfied(self):
     d = (self.p
-         | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int)
+         | beam.Create([2, 2, 3, 3]).with_output_types(int)
          | 'sample' >> combine.Sample.FixedSizeGlobally(3))
 
     self.assertCompatible(typehints.Iterable[int], d.element_type)
@@ -1658,7 +1660,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int)
+         | beam.Create([2, 2, 3, 3]).with_output_types(int)
          | 'sample' >> combine.Sample.FixedSizeGlobally(2))
 
     self.assertCompatible(typehints.Iterable[int], d.element_type)
@@ -1672,7 +1674,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_sample_per_key_pipeline_satisfied(self):
     d = (self.p
-         | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
+         | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
             .with_output_types(typehints.KV[int, int]))
          | 'sample' >> combine.Sample.FixedSizePerKey(2))
 
@@ -1691,7 +1693,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
+         | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
             .with_output_types(typehints.KV[int, int]))
          | 'sample' >> combine.Sample.FixedSizePerKey(1))
 
@@ -1708,8 +1710,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_to_list_pipeline_check_satisfied(self):
     d = (self.p
-         | 'c' >> beam.Create((1, 2, 3, 4)).with_output_types(int)
-         | 'to list' >> combine.ToList())
+         | beam.Create((1, 2, 3, 4)).with_output_types(int)
+         | combine.ToList())
 
     self.assertCompatible(typehints.List[int], d.element_type)
 
@@ -1724,8 +1726,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'c' >> beam.Create(list('test')).with_output_types(str)
-         | 'to list' >> combine.ToList())
+         | beam.Create(list('test')).with_output_types(str)
+         | combine.ToList())
 
     self.assertCompatible(typehints.List[str], d.element_type)
 
@@ -1739,8 +1741,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_to_dict_pipeline_check_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'd' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
-       | 'to dict' >> combine.ToDict())
+       | beam.Create([1, 2, 3, 4]).with_output_types(int)
+       | combine.ToDict())
 
     self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
                      "requires Tuple[TypeVariable[K], "
@@ -1751,9 +1753,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_to_dict_pipeline_check_satisfied(self):
     d = (self.p
          | beam.Create(
-             'd',
              [(1, 2), (3, 4)]).with_output_types(typehints.Tuple[int, int])
-         | 'to dict' >> combine.ToDict())
+         | combine.ToDict())
 
     self.assertCompatible(typehints.Dict[int, int], d.element_type)
     assert_that(d, equal_to([{1: 2, 3: 4}]))
@@ -1763,9 +1764,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'd' >> (beam.Create([('1', 2), ('3', 4)])
+         | (beam.Create([('1', 2), ('3', 4)])
             .with_output_types(typehints.Tuple[str, int]))
-         | 'to dict' >> combine.ToDict())
+         | combine.ToDict())
 
     self.assertCompatible(typehints.Dict[str, int], d.element_type)
     assert_that(d, equal_to([{'1': 2, '3': 4}]))
@@ -1776,7 +1777,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(TypeError) as e:
       (self.p
-       | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+       | beam.Create([1, 2, 3]).with_output_types(int)
        | 'len' >> beam.Map(lambda x: len(x)).with_output_types(int))
       self.p.run()
 
@@ -1799,7 +1800,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         beam.core.GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
 
   def test_pipeline_inference(self):
-    created = self.p | 'c' >> beam.Create(['a', 'b', 'c'])
+    created = self.p | beam.Create(['a', 'b', 'c'])
     mapped = created | 'pair with 1' >> beam.Map(lambda x: (x, 1))
     grouped = mapped | beam.GroupByKey()
     self.assertEqual(str, created.element_type)
@@ -1810,7 +1811,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_inferred_bad_kv_type(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       _ = (self.p
-           | 't' >> beam.Create(['a', 'b', 'c'])
+           | beam.Create(['a', 'b', 'c'])
            | 'ungroupable' >> beam.Map(lambda x: (x, 0, 1.0))
            | beam.GroupByKey())
 
@@ -1821,11 +1822,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_type_inference_command_line_flag_toggle(self):
     self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    x = self.p | 't' >> beam.Create([1, 2, 3, 4])
+    x = self.p | 'c1' >> beam.Create([1, 2, 3, 4])
     self.assertIsNone(x.element_type)
 
     self.p.options.view_as(TypeOptions).pipeline_type_check = True
-    x = self.p | 'm' >> beam.Create([1, 2, 3, 4])
+    x = self.p | 'c2' >> beam.Create([1, 2, 3, 4])
     self.assertEqual(int, x.element_type)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index bbb7787..4564cf9 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -220,7 +220,8 @@ def assert_that(actual, matcher, label='assert_that'):
   class AssertThat(PTransform):
 
     def apply(self, pipeline):
-      return pipeline | 'singleton' >> Create([None]) | Map(match, AllOf(actual))
+      return pipeline | 'singleton' >> Create([None]) | Map(match,
+                                                            AllOf(actual))
 
     def default_label(self):
       return label

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 4e1ab68..f2e8f12 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -177,7 +177,7 @@ class SideInputTest(unittest.TestCase):
 
     bad_side_input = p | 'bad_side' >> beam.Create(['z'])
     with self.assertRaises(typehints.TypeCheckError):
-      main_input | 'again' >> beam.Map(repeat, pvalue.AsSingleton(bad_side_input))
+      main_input | 'bis' >> beam.Map(repeat, pvalue.AsSingleton(bad_side_input))
 
   def test_deferred_side_input_iterable(self):
     @typehints.with_input_types(str, typehints.Iterable[str])



Mime
View raw message