beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [04/11] beam git commit: Remove fn_or_label argument from ParDo
Date Sat, 11 Feb 2017 01:00:17 GMT
Remove fn_or_label argument from ParDo


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c01415d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c01415d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c01415d

Branch: refs/heads/master
Commit: 9c01415d3100cd11fe231e58b5d4d0d7b592c895
Parents: 5af68f0
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Wed Feb 8 14:26:36 2017 -0800
Committer: Ahmet Altay <altay@google.com>
Committed: Fri Feb 10 16:59:49 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/core.py      | 25 ++++++++++----------
 .../python/apache_beam/transforms/ptransform.py | 10 ++------
 .../apache_beam/transforms/ptransform_test.py   |  3 +--
 3 files changed, 15 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9c01415d/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 5383ef6..719e816 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -577,8 +577,6 @@ class ParDo(PTransformWithSideInputs):
   process method.
 
   Args:
-      label: name of this transform instance. Useful while monitoring and
-        debugging a pipeline execution.
       pcoll: a PCollection to be processed.
       dofn: a DoFn object to be applied to each element of pcoll argument.
       *args: positional arguments passed to the dofn object.
@@ -592,8 +590,8 @@ class ParDo(PTransformWithSideInputs):
   the argument lists.
   """
 
-  def __init__(self, fn_or_label, *args, **kwargs):
-    super(ParDo, self).__init__(fn_or_label, *args, **kwargs)
+  def __init__(self, fn, *args, **kwargs):
+    super(ParDo, self).__init__(fn, *args, **kwargs)
 
     if not isinstance(self.fn, DoFn):
       raise TypeError('ParDo must be called with a DoFn instance.')
@@ -870,8 +868,8 @@ class CombineGlobally(PTransform):
                 | 'KeyWithVoid' >> add_input_types(
                     Map(lambda v: (None, v)).with_output_types(
                         KV[None, pcoll.element_type]))
-                | CombinePerKey(
-                    'CombinePerKey', self.fn, *self.args, **self.kwargs)
+                | 'CombinePerKey' >> CombinePerKey(
+                    self.fn, *self.args, **self.kwargs)
                 | 'UnKey' >> Map(lambda (k, v): v))
 
     if not self.has_defaults and not self.as_view:
@@ -946,8 +944,8 @@ class CombinePerKey(PTransformWithSideInputs):
   def expand(self, pcoll):
     args, kwargs = util.insert_values_in_args(
         self.args, self.kwargs, self.side_inputs)
-    return pcoll | GroupByKey() | CombineValues('Combine',
-                                                self.fn, *args, **kwargs)
+    return pcoll | GroupByKey() | 'Combine' >> CombineValues(
+        self.fn, *args, **kwargs)
 
 
 # TODO(robertwb): Rename to CombineGroupedValues?
@@ -1113,16 +1111,16 @@ class GroupByKey(PTransform):
               | 'group_by_key' >> (GroupByKeyOnly()
                  .with_input_types(reify_output_type)
                  .with_output_types(gbk_input_type))
-              | (ParDo('group_by_window',
-                       self.GroupAlsoByWindow(pcoll.windowing))
+              | ('group_by_window' >> ParDo(
+                     self.GroupAlsoByWindow(pcoll.windowing))
                  .with_input_types(gbk_input_type)
                  .with_output_types(gbk_output_type)))
     else:
       return (pcoll
               | 'reify_windows' >> ParDo(self.ReifyWindows())
               | 'group_by_key' >> GroupByKeyOnly()
-              | ParDo('group_by_window',
-                      self.GroupAlsoByWindow(pcoll.windowing)))
+              | 'group_by_window' >> ParDo(
+                    self.GroupAlsoByWindow(pcoll.windowing)))
 
 
 @typehints.with_input_types(typehints.KV[K, V])
@@ -1256,7 +1254,8 @@ class WindowInto(ParDo):
     self.windowing = Windowing(windowfn, triggerfn, accumulation_mode,
                                output_time_fn)
     dofn = self.WindowIntoFn(self.windowing)
-    super(WindowInto, self).__init__(label, dofn)
+    super(WindowInto, self).__init__(dofn)
+    self.label = label
 
   def get_windowing(self, unused_inputs):
     return self.windowing

http://git-wip-us.apache.org/repos/asf/beam/blob/9c01415d/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index b5ac64b..4b2eb58 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -507,19 +507,13 @@ class PTransformWithSideInputs(PTransform):
   for PTransforms that fit this model.
   """
 
-  def __init__(self, fn_or_label, *args, **kwargs):
-    if fn_or_label is None or isinstance(fn_or_label, basestring):
-      label = fn_or_label
-      fn, args = args[0], args[1:]
-    else:
-      label = None
-      fn = fn_or_label
+  def __init__(self, fn, *args, **kwargs):
     if isinstance(fn, type) and issubclass(fn, typehints.WithTypeHints):
       # Don't treat Fn class objects as callables.
       raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__))
     self.fn = self.make_fn(fn)
     # Now that we figure out the label, initialize the super-class.
-    super(PTransformWithSideInputs, self).__init__(label=label)
+    super(PTransformWithSideInputs, self).__init__()
 
     if (any([isinstance(v, pvalue.PCollection) for v in args]) or
         any([isinstance(v, pvalue.PCollection) for v in kwargs.itervalues()])):

http://git-wip-us.apache.org/repos/asf/beam/blob/9c01415d/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 8270517..f3291ac 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -421,8 +421,7 @@ class PTransformTest(unittest.TestCase):
     pipeline = TestPipeline()
     pcoll = pipeline | 'Start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     partitions = (
-        pcoll | beam.Partition(
-            'part',
+        pcoll | 'part' >> beam.Partition(
             lambda e, n, offset: (e % 3) + offset, 4,
             1))
     assert_that(partitions[0], equal_to([]))


Mime
View raw message