beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [13/19] beam git commit: Remove some internal details from the public API.
Date Fri, 12 May 2017 00:12:07 GMT
Remove some internal details from the public API.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e12cf0df
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e12cf0df
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e12cf0df

Branch: refs/heads/release-2.0.0
Commit: e12cf0dfd810ec26965ae98e5a2256f560c6ff6d
Parents: 2070f11
Author: Robert Bradshaw <robertwb@google.com>
Authored: Wed May 10 15:55:46 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Thu May 11 16:20:37 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py             |  4 ++--
 .../runners/dataflow/dataflow_runner.py         |  6 +++---
 .../runners/dataflow/dataflow_runner_test.py    |  6 +++---
 .../apache_beam/runners/direct/executor.py      |  2 +-
 .../runners/direct/transform_evaluator.py       | 10 ++++-----
 sdks/python/apache_beam/transforms/__init__.py  |  2 +-
 sdks/python/apache_beam/transforms/core.py      | 22 ++++++++++----------
 .../python/apache_beam/transforms/ptransform.py |  6 +++---
 .../apache_beam/transforms/ptransform_test.py   | 12 +++++------
 sdks/python/apache_beam/typehints/typecheck.py  |  2 +-
 10 files changed, 36 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 79480d7..5048534 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -77,8 +77,8 @@ class Pipeline(object):
   the PValues are the edges.
 
   All the transforms applied to the pipeline must have distinct full labels.
-  If same transform instance needs to be applied then a clone should be created
-  with a new label (e.g., transform.clone('new label')).
+  If same transform instance needs to be applied then the right shift operator
+  should be used to designate new names (e.g. `input | "label" >> my_tranform`).
   """
 
   def __init__(self, runner=None, options=None, argv=None):

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index da8de9d..0ecd22a 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -160,7 +160,7 @@ class DataflowRunner(PipelineRunner):
 
     class GroupByKeyInputVisitor(PipelineVisitor):
       """A visitor that replaces `Any` element type for input `PCollection` of
-      a `GroupByKey` or `GroupByKeyOnly` with a `KV` type.
+      a `GroupByKey` or `_GroupByKeyOnly` with a `KV` type.
 
       TODO(BEAM-115): Once Python SDk is compatible with the new Runner API,
       we could directly replace the coder instead of mutating the element type.
@@ -169,8 +169,8 @@ class DataflowRunner(PipelineRunner):
       def visit_transform(self, transform_node):
         # Imported here to avoid circular dependencies.
         # pylint: disable=wrong-import-order, wrong-import-position
-        from apache_beam.transforms.core import GroupByKey, GroupByKeyOnly
-        if isinstance(transform_node.transform, (GroupByKey, GroupByKeyOnly)):
+        from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly
+        if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)):
           pcoll = transform_node.inputs[0]
           input_type = pcoll.element_type
           # If input_type is not specified, then treat it as `Any`.

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index ac9b028..ff4b51d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -37,7 +37,7 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeExceptio
 from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.transforms.core import GroupByKeyOnly
+from apache_beam.transforms.core import _GroupByKeyOnly
 from apache_beam.typehints import typehints
 
 # Protect against environments where apitools library is not available.
@@ -185,7 +185,7 @@ class DataflowRunnerTest(unittest.TestCase):
     pcoll1 = PCollection(p)
     pcoll2 = PCollection(p)
     pcoll3 = PCollection(p)
-    for transform in [GroupByKeyOnly(), beam.GroupByKey()]:
+    for transform in [_GroupByKeyOnly(), beam.GroupByKey()]:
       pcoll1.element_type = None
       pcoll2.element_type = typehints.Any
       pcoll3.element_type = typehints.KV[typehints.Any, typehints.Any]
@@ -199,7 +199,7 @@ class DataflowRunnerTest(unittest.TestCase):
     p = TestPipeline()
     pcoll1 = PCollection(p)
     pcoll2 = PCollection(p)
-    for transform in [GroupByKeyOnly(), beam.GroupByKey()]:
+    for transform in [_GroupByKeyOnly(), beam.GroupByKey()]:
       pcoll1.element_type = typehints.TupleSequenceConstraint
       pcoll2.element_type = typehints.Set
       err_msg = "Input to GroupByKey must be of Tuple or Any type"

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 9efbede..86db291 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -155,7 +155,7 @@ class _SerialEvaluationState(_TransformEvaluationState):
   item of work will be submitted to the ExecutorService at any time.
 
   A principal use of this is for evaluators that keeps a global state such as
-  GroupByKeyOnly.
+  _GroupByKeyOnly.
   """
 
   def __init__(self, executor_service, scheduled):

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 6984ded..b1cb626 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -53,7 +53,7 @@ class TransformEvaluatorRegistry(object):
         io.Read: _BoundedReadEvaluator,
         core.Flatten: _FlattenEvaluator,
         core.ParDo: _ParDoEvaluator,
-        core.GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
+        core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
         _NativeWrite: _NativeWriteEvaluator,
     }
 
@@ -83,7 +83,7 @@ class TransformEvaluatorRegistry(object):
     """Returns True if this applied_ptransform should run one bundle at a time.
 
     Some TransformEvaluators use a global state object to keep track of their
-    global execution state. For example evaluator for GroupByKeyOnly uses this
+    global execution state. For example evaluator for _GroupByKeyOnly uses this
     state as an in memory dictionary to buffer keys.
 
     Serially executed evaluators will act as syncing point in the graph and
@@ -99,7 +99,7 @@ class TransformEvaluatorRegistry(object):
       True if executor should execute applied_ptransform serially.
     """
     return isinstance(applied_ptransform.transform,
-                      (core.GroupByKeyOnly, _NativeWrite))
+                      (core._GroupByKeyOnly, _NativeWrite))
 
 
 class _TransformEvaluator(object):
@@ -325,7 +325,7 @@ class _ParDoEvaluator(_TransformEvaluator):
 
 
 class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
-  """TransformEvaluator for GroupByKeyOnly transform."""
+  """TransformEvaluator for _GroupByKeyOnly transform."""
 
   MAX_ELEMENT_PER_BUNDLE = None
 
@@ -369,7 +369,7 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
       k, v = element.value
       self.state.output[self.key_coder.encode(k)].append(v)
     else:
-      raise TypeCheckError('Input to GroupByKeyOnly must be a PCollection of '
+      raise TypeCheckError('Input to _GroupByKeyOnly must be a PCollection of '
                            'windowed key-value pairs. Instead received: %r.'
                            % element)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/transforms/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py
index 847fb8f..b77b0f6 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -21,5 +21,5 @@
 from apache_beam.transforms import combiners
 from apache_beam.transforms.core import *
 from apache_beam.transforms.ptransform import *
-from apache_beam.transforms.timeutil import *
+from apache_beam.transforms.timeutil import TimeDomain
 from apache_beam.transforms.util import *

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index abe699f..0e497f9 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -215,7 +215,7 @@ class DoFn(WithTypeHints, HasDisplayData):
       return Any
     return type_hint
 
-  def process_argspec_fn(self):
+  def _process_argspec_fn(self):
     """Returns the Python callable that will eventually be invoked.
 
     This should ideally be the user-level function that is called with
@@ -307,7 +307,7 @@ class CallableWrapperDoFn(DoFn):
     return self._strip_output_annotations(
         trivial_inference.infer_return_type(self._fn, [input_type]))
 
-  def process_argspec_fn(self):
+  def _process_argspec_fn(self):
     return getattr(self._fn, '_argspec_fn', self._fn)
 
 
@@ -641,8 +641,8 @@ class ParDo(PTransformWithSideInputs):
       return fn
     return CallableWrapperDoFn(fn)
 
-  def process_argspec_fn(self):
-    return self.fn.process_argspec_fn()
+  def _process_argspec_fn(self):
+    return self.fn._process_argspec_fn()
 
   def display_data(self):
     return {'fn': DisplayDataItem(self.fn.__class__,
@@ -870,19 +870,19 @@ class CombineGlobally(PTransform):
   def default_label(self):
     return 'CombineGlobally(%s)' % ptransform.label_from_callable(self.fn)
 
-  def clone(self, **extra_attributes):
+  def _clone(self, **extra_attributes):
     clone = copy.copy(self)
     clone.__dict__.update(extra_attributes)
     return clone
 
   def with_defaults(self, has_defaults=True):
-    return self.clone(has_defaults=has_defaults)
+    return self._clone(has_defaults=has_defaults)
 
   def without_defaults(self):
     return self.with_defaults(False)
 
   def as_singleton_view(self):
-    return self.clone(as_view=True)
+    return self._clone(as_view=True)
 
   def expand(self, pcoll):
     def add_input_types(transform):
@@ -964,7 +964,7 @@ class CombinePerKey(PTransformWithSideInputs):
   def default_label(self):
     return '%s(%s)' % (self.__class__.__name__, self._fn_label)
 
-  def process_argspec_fn(self):
+  def _process_argspec_fn(self):
     return self.fn._fn  # pylint: disable=protected-access
 
   def expand(self, pcoll):
@@ -1133,7 +1133,7 @@ class GroupByKey(PTransform):
       return (pcoll
               | 'ReifyWindows' >> (ParDo(self.ReifyWindows())
                  .with_output_types(reify_output_type))
-              | 'GroupByKey' >> (GroupByKeyOnly()
+              | 'GroupByKey' >> (_GroupByKeyOnly()
                  .with_input_types(reify_output_type)
                  .with_output_types(gbk_input_type))
               | ('GroupByWindow' >> ParDo(
@@ -1144,14 +1144,14 @@ class GroupByKey(PTransform):
       # The input_type is None, run the default
       return (pcoll
               | 'ReifyWindows' >> ParDo(self.ReifyWindows())
-              | 'GroupByKey' >> GroupByKeyOnly()
+              | 'GroupByKey' >> _GroupByKeyOnly()
               | 'GroupByWindow' >> ParDo(
                     self.GroupAlsoByWindow(pcoll.windowing)))
 
 
 @typehints.with_input_types(typehints.KV[K, V])
 @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
-class GroupByKeyOnly(PTransform):
+class _GroupByKeyOnly(PTransform):
   """A group by key transform, ignoring windows."""
   def infer_output_type(self, input_type):
     key_type, value_type = trivial_inference.key_value_types(input_type)

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 8898c36..bd2a120 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -303,7 +303,7 @@ class PTransform(WithTypeHints, HasDisplayData):
     # TODO(ccy): further refine this API.
     return None
 
-  def clone(self, new_label):
+  def _clone(self, new_label):
     """Clones the current transform instance under a new label."""
     transform = copy.copy(self)
     transform.label = new_label
@@ -567,7 +567,7 @@ class PTransformWithSideInputs(PTransform):
 
       arg_types = [pvalueish.element_type] + [element_type(v) for v in args]
       kwargs_types = {k: element_type(v) for (k, v) in kwargs.items()}
-      argspec_fn = self.process_argspec_fn()
+      argspec_fn = self._process_argspec_fn()
       bindings = getcallargs_forhints(argspec_fn, *arg_types, **kwargs_types)
       hints = getcallargs_forhints(argspec_fn, *type_hints[0], **type_hints[1])
       for arg, hint in hints.items():
@@ -581,7 +581,7 @@ class PTransformWithSideInputs(PTransform):
               'Type hint violation for \'%s\': requires %s but got %s for %s'
               % (self.label, hint, bindings[arg], arg))
 
-  def process_argspec_fn(self):
+  def _process_argspec_fn(self):
     """Returns an argspec of the function actually consuming the data.
     """
     raise NotImplementedError

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index f790660..efc5978 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -35,7 +35,7 @@ import apache_beam.pvalue as pvalue
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that, equal_to
 from apache_beam.transforms import window
-from apache_beam.transforms.core import GroupByKeyOnly
+from apache_beam.transforms.core import _GroupByKeyOnly
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.display import DisplayData, DisplayDataItem
 from apache_beam.transforms.ptransform import PTransform
@@ -580,7 +580,7 @@ class PTransformTest(unittest.TestCase):
     pipeline = TestPipeline()
     pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f'])
     with self.assertRaises(typehints.TypeCheckError) as cm:
-      pcolls | 'D' >> GroupByKeyOnly()
+      pcolls | 'D' >> _GroupByKeyOnly()
       pipeline.run()
 
     expected_error_prefix = ('Input type hint violation at D: expected '
@@ -1088,7 +1088,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | 'Str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
          | ('Pair' >> beam.Map(lambda x: (x, ord(x)))
             .with_output_types(typehints.KV[str, str]))
-         | GroupByKeyOnly())
+         | _GroupByKeyOnly())
 
     # Output type should correctly be deduced.
     # GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -1112,7 +1112,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create([1, 2, 3]).with_output_types(int)
-       | 'F' >> GroupByKeyOnly())
+       | 'F' >> _GroupByKeyOnly())
 
     self.assertEqual("Input type hint violation at F: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1155,7 +1155,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       (self.p
        | 'Nums' >> beam.Create(range(5)).with_output_types(int)
        | 'ModDup' >> beam.Map(lambda x: (x % 2, x))
-       | GroupByKeyOnly())
+       | _GroupByKeyOnly())
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
                      'type-hint was found for the PTransform '
@@ -1978,7 +1978,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_gbk_type_inference(self):
     self.assertEqual(
         typehints.Tuple[str, typehints.Iterable[int]],
-        GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
+        _GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
 
   def test_pipeline_inference(self):
     created = self.p | beam.Create(['a', 'b', 'c'])

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/typehints/typecheck.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py
index 09b73f9..89a5f5c 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -109,7 +109,7 @@ class TypeCheckWrapperDoFn(AbstractDoFnWrapper):
   def __init__(self, dofn, type_hints, label=None):
     super(TypeCheckWrapperDoFn, self).__init__(dofn)
     self.dofn = dofn
-    self._process_fn = self.dofn.process_argspec_fn()
+    self._process_fn = self.dofn._process_argspec_fn()
     if type_hints.input_types:
       input_args, input_kwargs = type_hints.input_types
       self._input_hints = getcallargs_forhints(


Mime
View raw message