beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [03/11] beam git commit: Remove label_or_fn from CombineGlobally
Date Sat, 11 Feb 2017 01:00:16 GMT
Remove label_or_fn from CombineGlobally


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/109ead9d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/109ead9d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/109ead9d

Branch: refs/heads/master
Commit: 109ead9de6d5878bfe09810b30469bd2b8c9e5dc
Parents: 9c01415
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Wed Feb 8 14:35:15 2017 -0800
Committer: Ahmet Altay <altay@google.com>
Committed: Fri Feb 10 16:59:49 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/combiners.py |  4 ++--
 sdks/python/apache_beam/transforms/core.py      | 14 ++++++--------
 2 files changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/109ead9d/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index d4874b7..f55d46a 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -476,7 +476,7 @@ class ToList(ptransform.PTransform):
     super(ToList, self).__init__(label)
 
   def expand(self, pcoll):
-    return pcoll | core.CombineGlobally(self.label, ToListCombineFn())
+    return pcoll | self.label >> core.CombineGlobally(ToListCombineFn())
 
 
 @with_input_types(T)
@@ -510,7 +510,7 @@ class ToDict(ptransform.PTransform):
     super(ToDict, self).__init__(label)
 
   def expand(self, pcoll):
-    return pcoll | core.CombineGlobally(self.label, ToDictCombineFn())
+    return pcoll | self.label >> core.CombineGlobally(ToDictCombineFn())
 
 
 @with_input_types(Tuple[K, V])

http://git-wip-us.apache.org/repos/asf/beam/blob/109ead9d/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 719e816..856d395 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -795,8 +795,6 @@ class CombineGlobally(PTransform):
   are applied.
 
   Args:
-    label: name of this transform instance. Useful while monitoring and
-      debugging a pipeline execution.
     pcoll: a PCollection to be reduced into a single value.
     fn: a CombineFn object that will be called to progressively reduce the
       PCollection into single values, or a callable suitable for wrapping
@@ -822,13 +820,13 @@ class CombineGlobally(PTransform):
   has_defaults = True
   as_view = False
 
-  def __init__(self, label_or_fn, *args, **kwargs):
-    if label_or_fn is None or isinstance(label_or_fn, str):
-      label, fn, args = label_or_fn, args[0], args[1:]
-    else:
-      label, fn = None, label_or_fn
+  def __init__(self, fn, *args, **kwargs):
+    if not (isinstance(fn, CombineFn) or callable(fn)):
+      raise TypeError(
+          'CombineGlobally can be used only with combineFn objects. '
+          'Received %r instead.' % (fn))
 
-    super(CombineGlobally, self).__init__(label)
+    super(CombineGlobally, self).__init__()
     self.fn = fn
     self.args = args
     self.kwargs = kwargs


Mime
View raw message