beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/4] incubator-beam git commit: Remove unneeded label argument in ptransform_fn
Date Thu, 07 Jul 2016 18:51:46 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 342d2d798 -> 9bc04b750


Remove unneeded label argument in ptransform_fn

The label is already in the fully qualified name due to nesting.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/31b3f00c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/31b3f00c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/31b3f00c

Branch: refs/heads/python-sdk
Commit: 31b3f00cf709ee06fd6f3d38567404861c0ae244
Parents: e064377
Author: Robert Bradshaw <robertwb@google.com>
Authored: Wed Jul 6 15:15:51 2016 -0700
Committer: Robert Bradshaw <robertwb@google.com>
Committed: Thu Jul 7 11:50:49 2016 -0700

----------------------------------------------------------------------
 .../examples/cookbook/custom_ptransform.py      |  7 ++--
 sdks/python/apache_beam/transforms/combiners.py | 34 +++++++++-----------
 sdks/python/apache_beam/transforms/core.py      |  4 +--
 .../python/apache_beam/transforms/ptransform.py |  8 ++---
 .../apache_beam/transforms/ptransform_test.py   |  7 ++--
 sdks/python/apache_beam/transforms/util.py      |  8 ++---
 6 files changed, 30 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31b3f00c/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index f97545a..8da1f43 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -57,7 +57,7 @@ def run_count2(known_args, options):
   """Runs the second example pipeline."""
 
   @beam.ptransform_fn
-  def Count(label, pcoll):      # pylint: disable=invalid-name,unused-argument
+  def Count(pcoll):      # pylint: disable=invalid-name
     """Count as a decorated function."""
     return (
         pcoll
@@ -76,12 +76,11 @@ def run_count3(known_args, options):
   """Runs the third example pipeline."""
 
   @beam.ptransform_fn
-  # pylint: disable=invalid-name,unused-argument
-  def Count(label, pcoll, factor=1):
+  # pylint: disable=invalid-name
+  def Count(pcoll, factor=1):
     """Count as a decorated function with a side input.
 
     Args:
-      label: optional label for this transform
       pcoll: the PCollection passed in from the previous transform
       factor: the amount by which to count
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31b3f00c/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index e9f11a0..8c56e5a 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -148,7 +148,7 @@ class Top(object):
   # pylint: disable=no-self-argument
 
   @ptransform.ptransform_fn
-  def Of(label, pcoll, n, compare, *args, **kwargs):
+  def Of(pcoll, n, compare, *args, **kwargs):
     """Obtain a list of the compare-most N elements in a PCollection.
 
     This transform will retrieve the n greatest elements in the PCollection
@@ -160,7 +160,6 @@ class Top(object):
     become additional arguments to the comparator.
 
     Args:
-      label: display label for transform processes.
       pcoll: PCollection to process.
       n: number of elements to extract from pcoll.
       compare: as described above.
@@ -168,10 +167,10 @@ class Top(object):
       **kwargs: as described above.
     """
     return pcoll | core.CombineGlobally(
-        label, TopCombineFn(n, compare), *args, **kwargs)
+        TopCombineFn(n, compare), *args, **kwargs)
 
   @ptransform.ptransform_fn
-  def PerKey(label, pcoll, n, compare, *args, **kwargs):
+  def PerKey(pcoll, n, compare, *args, **kwargs):
     """Identifies the compare-most N elements associated with each key.
 
     This transform will produce a PCollection mapping unique keys in the input
@@ -184,7 +183,6 @@ class Top(object):
     become additional arguments to the comparator.
 
     Args:
-      label: display label for transform processes.
       pcoll: PCollection to process.
       n: number of elements to extract from pcoll.
       compare: as described above.
@@ -196,27 +194,27 @@ class Top(object):
         compatible with KV[A, B].
     """
     return pcoll | core.CombinePerKey(
-        label, TopCombineFn(n, compare), *args, **kwargs)
+        TopCombineFn(n, compare), *args, **kwargs)
 
   @ptransform.ptransform_fn
-  def Largest(label, pcoll, n):
+  def Largest(pcoll, n):
     """Obtain a list of the greatest N elements in a PCollection."""
-    return pcoll | Top.Of(label, n, lambda a, b: a < b)
+    return pcoll | Top.Of(n, lambda a, b: a < b)
 
   @ptransform.ptransform_fn
-  def Smallest(label, pcoll, n):
+  def Smallest(pcoll, n):
     """Obtain a list of the least N elements in a PCollection."""
-    return pcoll | Top.Of(label, n, lambda a, b: b < a)
+    return pcoll | Top.Of(n, lambda a, b: b < a)
 
   @ptransform.ptransform_fn
-  def LargestPerKey(label, pcoll, n):
+  def LargestPerKey(pcoll, n):
     """Identifies the N greatest elements associated with each key."""
-    return pcoll | Top.PerKey(label, n, lambda a, b: a < b)
+    return pcoll | Top.PerKey(n, lambda a, b: a < b)
 
   @ptransform.ptransform_fn
-  def SmallestPerKey(label, pcoll, n):
+  def SmallestPerKey(pcoll, n):
     """Identifies the N least elements associated with each key."""
-    return pcoll | Top.PerKey(label, n, lambda a, b: b < a)
+    return pcoll | Top.PerKey(n, lambda a, b: b < a)
 
 
 @with_input_types(T)
@@ -325,12 +323,12 @@ class Sample(object):
   # pylint: disable=no-self-argument
 
   @ptransform.ptransform_fn
-  def FixedSizeGlobally(label, pcoll, n):
-    return pcoll | core.CombineGlobally(label, SampleCombineFn(n))
+  def FixedSizeGlobally(pcoll, n):
+    return pcoll | core.CombineGlobally(SampleCombineFn(n))
 
   @ptransform.ptransform_fn
-  def FixedSizePerKey(label, pcoll, n):
-    return pcoll | core.CombinePerKey(label, SampleCombineFn(n))
+  def FixedSizePerKey(pcoll, n):
+    return pcoll | core.CombinePerKey(SampleCombineFn(n))
 
 
 @with_input_types(T)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31b3f00c/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 2057916..9b477db 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -857,7 +857,7 @@ class CombineGlobally(PTransform):
 
 
 @ptransform_fn
-def CombinePerKey(label, pcoll, fn, *args, **kwargs):  # pylint: disable=invalid-name
+def CombinePerKey(pcoll, fn, *args, **kwargs):  # pylint: disable=invalid-name
   """A per-key Combine transform.
 
   Identifies sets of values associated with the same key in the input
@@ -866,8 +866,6 @@ def CombinePerKey(label, pcoll, fn, *args, **kwargs):  # pylint: disable=invalid
   CombineFns are applied.
 
   Args:
-    label: name of this transform instance. Useful while monitoring and
-      debugging a pipeline execution.
     pcoll: input pcollection.
     fn: instance of CombineFn to apply to all values under the same key in
       pcoll, or a callable whose signature is f(iterable, *args, **kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31b3f00c/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 2b3ad32..04dd9b3 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -627,7 +627,7 @@ class CallablePTransform(PTransform):
     except TypeError:
       # Might not be a function.
       pass
-    return self.fn(self.label, pcoll, *args, **kwargs)
+    return self.fn(pcoll, *args, **kwargs)
 
   def default_label(self):
     if self._args:
@@ -649,11 +649,11 @@ def ptransform_fn(fn):
   This wrapper provides an alternative, simpler way to define a PTransform.
   The standard method is to subclass from PTransform and override the apply()
   method. An equivalent effect can be obtained by defining a function that
-  takes a label, an input PCollection and additional optional arguments and
-  returns a resulting PCollection. For example::
+  an input PCollection and additional optional arguments and returns a
+  resulting PCollection. For example::
 
     @ptransform_fn
-    def CustomMapper(label, pcoll, mapfn):
+    def CustomMapper(pcoll, mapfn):
       return pcoll | ParDo(mapfn)
 
   The equivalent approach using PTransform subclassing::

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31b3f00c/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index c98a945..aedb384 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -559,13 +559,11 @@ class PTransformTest(unittest.TestCase):
 
 
 @beam.ptransform_fn
-def SamplePTransform(label, pcoll, context, *args, **kwargs):
+def SamplePTransform(pcoll):
   """Sample transform using the @ptransform_fn decorator."""
-  _ = label, args, kwargs
   map_transform = beam.Map('ToPairs', lambda v: (v, None))
   combine_transform = beam.CombinePerKey('Group', lambda vs: None)
   keys_transform = beam.Keys('RemoveDuplicates')
-  context.extend([map_transform, combine_transform, keys_transform])
   return pcoll | map_transform | combine_transform | keys_transform
 
 
@@ -626,8 +624,7 @@ class PTransformLabelsTest(unittest.TestCase):
   def test_apply_ptransform_using_decorator(self):
     pipeline = Pipeline('DirectPipelineRunner')
     pcoll = pipeline | beam.Create('pcoll', [1, 2, 3])
-    context = []
-    sample = SamplePTransform('*sample*', context)
+    sample = SamplePTransform('*sample*')
     _ = pcoll | sample
     self.assertTrue('*sample*' in pipeline.applied_labels)
     self.assertTrue('*sample*/ToPairs' in pipeline.applied_labels)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31b3f00c/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 6b5b16c..b7a121d 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -157,12 +157,12 @@ def KvSwap(label='KvSwap'):  # pylint: disable=invalid-name
 
 
 @ptransform_fn
-def RemoveDuplicates(label, pcoll):  # pylint: disable=invalid-name
+def RemoveDuplicates(pcoll):  # pylint: disable=invalid-name
   """Produces a PCollection containing the unique elements of a PCollection."""
   return (pcoll
-          | Map('%s:ToPairs' % label, lambda v: (v, None))
-          | CombinePerKey('%s:Group' % label, lambda vs: None)
-          | Keys('%s:RemoveDuplicates' % label))
+          | Map('ToPairs', lambda v: (v, None))
+          | CombinePerKey('Group', lambda vs: None)
+          | Keys('RemoveDuplicates'))
 
 
 class DataflowAssertException(Exception):


Mime
View raw message