beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Remove the usage of OldDoFn and clean up function names
Date Mon, 06 Feb 2017 02:10:59 GMT
Repository: beam
Updated Branches:
  refs/heads/master 4d0e8ecf1 -> b3d962df2


Remove the usage of OldDoFn and clean up function names


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/137d392e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/137d392e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/137d392e

Branch: refs/heads/master
Commit: 137d392e79080f67a24fa418c474e4f852d1dd74
Parents: 4d0e8ec
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Sun Feb 5 11:11:25 2017 -0800
Committer: Sourabh Bajaj <sourabhbajaj@google.com>
Committed: Sun Feb 5 16:01:54 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/common.pxd      |  12 +-
 sdks/python/apache_beam/runners/common.py       | 225 +++++++------------
 .../runners/direct/transform_evaluator.py       |  15 +-
 sdks/python/apache_beam/transforms/core.py      |  87 +------
 sdks/python/apache_beam/typehints/typecheck.py  | 133 -----------
 5 files changed, 92 insertions(+), 380 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/137d392e/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index dbb08f0..f36fdd0 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -37,22 +37,20 @@ cdef class DoFnRunner(Receiver):
   cdef object tagged_receivers
   cdef LoggingContext logging_context
   cdef object step_name
-  cdef bint is_new_dofn
   cdef list args
   cdef dict kwargs
   cdef ScopedMetricsContainer scoped_metrics_container
   cdef list side_inputs
-  cdef bint has_windowed_side_inputs
+  cdef bint has_windowed_inputs
   cdef list placeholders
-  cdef bint simple_process
+  cdef bint use_simple_invoker
 
   cdef Receiver main_receivers
 
   cpdef process(self, WindowedValue element)
-  cdef old_dofn_process(self, WindowedValue element)
-  cdef new_dofn_process(self, WindowedValue element)
-  cdef new_dofn_simple_process(self, WindowedValue element)
-  cdef _new_dofn_window_process(
+  cdef _dofn_invoker(self, WindowedValue element)
+  cdef _dofn_simple_invoker(self, WindowedValue element)
+  cdef _dofn_window_invoker(
       self, WindowedValue element, list args, dict kwargs, object window)
 
   @cython.locals(windowed_value=WindowedValue)

http://git-wip-us.apache.org/repos/asf/beam/blob/137d392e/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 9c942c0..aa6c2dd 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -115,141 +115,89 @@ class DoFnRunner(Receiver):
       assert context is not None
       self.context = context
 
-    # TODO(Sourabhbajaj): Remove the usage of OldDoFn
-    if isinstance(fn, core.DoFn):
-
-      class ArgPlaceholder(object):
-        def __init__(self, placeholder):
-          self.placeholder = placeholder
-
-      self.is_new_dofn = True
+    class ArgPlaceholder(object):
+      def __init__(self, placeholder):
+        self.placeholder = placeholder
+
+    # Stash values for use in dofn_process.
+    self.side_inputs = side_inputs
+    self.has_windowed_inputs = not all(
+        si.is_globally_windowed() for si in self.side_inputs)
+
+    self.args = args if args else []
+    self.kwargs = kwargs if kwargs else {}
+    self.dofn = fn
+    self.dofn_process = fn.process
+
+    arguments, _, _, defaults = self.dofn.get_function_arguments('process')
+    defaults = defaults if defaults else []
+    self_in_args = int(self.dofn.is_process_bounded())
+
+    self.use_simple_invoker = (
+        not side_inputs and not args and not kwargs and not defaults)
+    if self.use_simple_invoker:
+      # As we're using the simple invoker we don't need to compute placeholders
+      return
 
-      # Stash values for use in new_dofn_process.
-      self.side_inputs = side_inputs
-      self.has_windowed_side_inputs = not all(
-          si.is_globally_windowed() for si in self.side_inputs)
+    self.has_windowed_inputs = (self.has_windowed_inputs or
+                                core.DoFn.WindowParam in defaults)
 
-      self.args = args if args else []
-      self.kwargs = kwargs if kwargs else {}
-      self.dofn = fn
-      self.dofn_process = fn.process
+    # Try to prepare all the arguments that can just be filled in
+    # without any additional work. in the process function.
+    # Also cache all the placeholders needed in the process function.
 
-      arguments, _, _, defaults = self.dofn.get_function_arguments('process')
-      defaults = defaults if defaults else []
-      self_in_args = int(self.dofn.is_process_bounded())
-
-      self.simple_process = (
-          not side_inputs and not args and not kwargs and not defaults)
-      if self.simple_process:
-        return
-
-      # TODO(Sourabhbajaj) Rename this variable once oldDoFn is deprecated
-      self.has_windowed_side_inputs = (
-          self.has_windowed_side_inputs or
-          core.DoFn.WindowParam in defaults)
-
-      # Try to prepare all the arguments that can just be filled in
-      # without any additional work. in the process function.
-      # Also cache all the placeholders needed in the process function.
-
-      # Fill in sideInputs if they are globally windowed
-      if not self.has_windowed_side_inputs:
-        self.args, self.kwargs = util.insert_values_in_args(
-            args, kwargs, [si[global_window] for si in side_inputs])
-
-      # Create placeholder for element parameter
-      if core.DoFn.ElementParam not in defaults:
-        args_to_pick = len(arguments) - len(defaults) - 1 - self_in_args
-        final_args = [ArgPlaceholder(core.DoFn.ElementParam)] + \
-                     self.args[:args_to_pick]
-      else:
-        args_to_pick = len(arguments) - len(defaults) - self_in_args
-        final_args = self.args[:args_to_pick]
-
-      # Fill the OtherPlaceholders for context, window or timestamp
-      args = iter(self.args[args_to_pick:])
-      for a, d in zip(arguments[-len(defaults):], defaults):
-        if d == core.DoFn.ElementParam:
-          final_args.append(ArgPlaceholder(d))
-        elif d == core.DoFn.ContextParam:
-          final_args.append(ArgPlaceholder(d))
-        elif d == core.DoFn.WindowParam:
-          final_args.append(ArgPlaceholder(d))
-        elif d == core.DoFn.TimestampParam:
-          final_args.append(ArgPlaceholder(d))
-        elif d == core.DoFn.SideInputParam:
-          # If no more args are present then the value must be passed via kwarg
-          try:
-            final_args.append(args.next())
-          except StopIteration:
-            if a not in self.kwargs:
-              raise ValueError("Value for sideinput %s not provided" % a)
-        else:
-          # If no more args are present then the value must be passed via kwarg
-          try:
-            final_args.append(args.next())
-          except StopIteration:
-            pass
-      final_args.extend(list(args))
-      self.args = final_args
-
-      # Stash the list of placeholder positions for performance
-      self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(self.args)
-                           if isinstance(x, ArgPlaceholder)]
+    # Fill in sideInputs if they are globally windowed
+    if not self.has_windowed_inputs:
+      self.args, self.kwargs = util.insert_values_in_args(
+          args, kwargs, [si[global_window] for si in side_inputs])
 
+    # Create placeholder for element parameter
+    if core.DoFn.ElementParam not in defaults:
+      args_to_pick = len(arguments) - len(defaults) - 1 - self_in_args
+      final_args = [ArgPlaceholder(core.DoFn.ElementParam)] + \
+                   self.args[:args_to_pick]
     else:
-      self.is_new_dofn = False
-      self.has_windowed_side_inputs = False  # Set to True in one case below.
-      if not args and not kwargs:
-        self.dofn = fn
-        self.dofn_process = fn.process
+      args_to_pick = len(arguments) - len(defaults) - self_in_args
+      final_args = self.args[:args_to_pick]
+
+    # Fill the OtherPlaceholders for context, window or timestamp
+    args = iter(self.args[args_to_pick:])
+    for a, d in zip(arguments[-len(defaults):], defaults):
+      if d == core.DoFn.ElementParam:
+        final_args.append(ArgPlaceholder(d))
+      elif d == core.DoFn.ContextParam:
+        final_args.append(ArgPlaceholder(d))
+      elif d == core.DoFn.WindowParam:
+        final_args.append(ArgPlaceholder(d))
+      elif d == core.DoFn.TimestampParam:
+        final_args.append(ArgPlaceholder(d))
+      elif d == core.DoFn.SideInputParam:
+        # If no more args are present then the value must be passed via kwarg
+        try:
+          final_args.append(args.next())
+        except StopIteration:
+          if a not in self.kwargs:
+            raise ValueError("Value for sideinput %s not provided" % a)
       else:
-        if side_inputs and all(
-            side_input.is_globally_windowed() for side_input in side_inputs):
-          args, kwargs = util.insert_values_in_args(
-              args, kwargs, [side_input[global_window]
-                             for side_input in side_inputs])
-          side_inputs = []
-        if side_inputs:
-          self.has_windowed_side_inputs = True
-
-          def process(context):
-            w = context.windows[0]
-            cur_args, cur_kwargs = util.insert_values_in_args(
-                args, kwargs, [side_input[w] for side_input in side_inputs])
-            return fn.process(context, *cur_args, **cur_kwargs)
-          self.dofn_process = process
-        elif kwargs:
-          self.dofn_process = lambda context: fn.process(
-              context, *args, **kwargs)
-        else:
-          self.dofn_process = lambda context: fn.process(context, *args)
-
-        class CurriedFn(core.OldDoFn):
-
-          start_bundle = staticmethod(fn.start_bundle)
-          process = staticmethod(self.dofn_process)
-          finish_bundle = staticmethod(fn.finish_bundle)
-
-        self.dofn = CurriedFn()
+        # If no more args are present then the value must be passed via kwarg
+        try:
+          final_args.append(args.next())
+        except StopIteration:
+          pass
+    final_args.extend(list(args))
+    self.args = final_args
+
+    # Stash the list of placeholder positions for performance
+    self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(self.args)
+                         if isinstance(x, ArgPlaceholder)]
 
   def receive(self, windowed_value):
     self.process(windowed_value)
 
-  def old_dofn_process(self, element):
-    if self.has_windowed_side_inputs and len(element.windows) > 1:
-      for w in element.windows:
-        self.context.set_element(
-            WindowedValue(element.value, element.timestamp, (w,)))
-        self._process_outputs(element, self.dofn_process(self.context))
-    else:
-      self.context.set_element(element)
-      self._process_outputs(element, self.dofn_process(self.context))
-
-  def new_dofn_simple_process(self, element):
+  def _dofn_simple_invoker(self, element):
     self._process_outputs(element, self.dofn_process(element.value))
 
-  def _new_dofn_window_process(self, element, args, kwargs, window):
+  def _dofn_window_invoker(self, element, args, kwargs, window):
     # TODO(sourabhbajaj): Investigate why we can't use `is` instead of ==
     for i, p in self.placeholders:
       if p == core.DoFn.ElementParam:
@@ -265,18 +213,18 @@ class DoFnRunner(Receiver):
     else:
       self._process_outputs(element, self.dofn_process(*args, **kwargs))
 
-  def new_dofn_process(self, element):
+  def _dofn_invoker(self, element):
     self.context.set_element(element)
     # Call for the process function for each window if has windowed side inputs
     # or if the process accesses the window parameter. We can just call it once
     # otherwise as none of the arguments are changing
-    if self.has_windowed_side_inputs:
+    if self.has_windowed_inputs:
       for w in element.windows:
         args, kwargs = util.insert_values_in_args(
             self.args, self.kwargs, [si[w] for si in self.side_inputs])
-        self._new_dofn_window_process(element, args, kwargs, w)
+        self._dofn_window_invoker(element, args, kwargs, w)
     else:
-      self._new_dofn_window_process(element, self.args, self.kwargs, None)
+      self._dofn_window_invoker(element, self.args, self.kwargs, None)
 
   def _invoke_bundle_method(self, method):
     try:
@@ -285,15 +233,11 @@ class DoFnRunner(Receiver):
       self.context.set_element(None)
       f = getattr(self.dofn, method)
 
-      # TODO(Sourabhbajaj): Remove this if-else
-      if self.is_new_dofn:
-        _, _, _, defaults = self.dofn.get_function_arguments(method)
-        defaults = defaults if defaults else []
-        args = [self.context if d == core.DoFn.ContextParam else d
-                for d in defaults]
-        self._process_outputs(None, f(*args))
-      else:
-        self._process_outputs(None, f(self.context))
+      _, _, _, defaults = self.dofn.get_function_arguments(method)
+      defaults = defaults if defaults else []
+      args = [self.context if d == core.DoFn.ContextParam else d
+              for d in defaults]
+      self._process_outputs(None, f(*args))
     except BaseException as exn:
       self.reraise_augmented(exn)
     finally:
@@ -310,13 +254,10 @@ class DoFnRunner(Receiver):
     try:
       self.logging_context.enter()
       self.scoped_metrics_container.enter()
-      if self.is_new_dofn:
-        if self.simple_process:
-          self.new_dofn_simple_process(element)
-        else:
-          self.new_dofn_process(element)
+      if self.use_simple_invoker:
+        self._dofn_simple_invoker(element)
       else:
-        self.old_dofn_process(element)
+        self._dofn_invoker(element)
     except BaseException as exn:
       self.reraise_augmented(exn)
     finally:

http://git-wip-us.apache.org/repos/asf/beam/blob/137d392e/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 3053fd3..1700de6 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -35,10 +35,8 @@ from apache_beam.transforms import sideinputs
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import WindowedValue
 from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
-from apache_beam.typehints.typecheck import OutputCheckWrapperOldDoFn
 from apache_beam.typehints.typecheck import TypeCheckError
 from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
-from apache_beam.typehints.typecheck import TypeCheckWrapperOldDoFn
 from apache_beam.utils import counters
 from apache_beam.utils.pipeline_options import TypeOptions
 
@@ -350,18 +348,9 @@ class _ParDoEvaluator(_TransformEvaluator):
     pipeline_options = self._evaluation_context.pipeline_options
     if (pipeline_options is not None
         and pipeline_options.view_as(TypeOptions).runtime_type_check):
-      # TODO(sourabhbajaj): Remove this if-else
-      if isinstance(dofn, core.DoFn):
-        dofn = TypeCheckWrapperDoFn(dofn, transform.get_type_hints())
-      else:
-        dofn = TypeCheckWrapperOldDoFn(dofn, transform.get_type_hints())
+      dofn = TypeCheckWrapperDoFn(dofn, transform.get_type_hints())
 
-    # TODO(sourabhbajaj): Remove this if-else
-    if isinstance(dofn, core.DoFn):
-      dofn = OutputCheckWrapperDoFn(dofn, self._applied_ptransform.full_label)
-    else:
-      dofn = OutputCheckWrapperOldDoFn(dofn,
-                                       self._applied_ptransform.full_label)
+    dofn = OutputCheckWrapperDoFn(dofn, self._applied_ptransform.full_label)
     self.runner = DoFnRunner(
         dofn, transform.args, transform.kwargs,
         self._side_inputs,

http://git-wip-us.apache.org/repos/asf/beam/blob/137d392e/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 91de7f6..e010cbd 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -21,7 +21,6 @@ from __future__ import absolute_import
 
 import copy
 import inspect
-import warnings
 import types
 
 from apache_beam import pvalue
@@ -216,88 +215,6 @@ class DoFn(WithTypeHints, HasDisplayData):
     return True
 
 
-# TODO(Sourabh): Remove after migration to DoFn
-class OldDoFn(WithTypeHints, HasDisplayData):
-  """A function object used by a transform with custom processing.
-
-  The ParDo transform is such a transform. The ParDo.expand()
-  method will take an object of type DoFn and apply it to all elements of a
-  PCollection object.
-
-  In order to have concrete DoFn objects one has to subclass from DoFn and
-  define the desired behavior (start_bundle/finish_bundle and process) or wrap a
-  callable object using the CallableWrapperDoFn class.
-  """
-
-  def __init__(self):
-    warnings.warn('Use of OldDoFn is deprecated please use DoFn instead')
-    super(OldDoFn, self).__init__()
-
-  def default_label(self):
-    return self.__class__.__name__
-
-  def infer_output_type(self, input_type):
-    # TODO(robertwb): Side inputs types.
-    # TODO(robertwb): Assert compatibility with input type hint?
-    return self._strip_output_annotations(
-        trivial_inference.infer_return_type(self.process, [input_type]))
-
-  def start_bundle(self, context):
-    """Called before a bundle of elements is processed on a worker.
-
-    Elements to be processed are split into bundles and distributed
-    to workers.  Before a worker calls process() on the first element
-    of its bundle, it calls this method.
-
-    Args:
-      context: a DoFnContext object
-    """
-    pass
-
-  def finish_bundle(self, context):
-    """Called after a bundle of elements is processed on a worker.
-
-    Args:
-      context: a DoFnContext object
-    """
-    pass
-
-  def process(self, context, *args, **kwargs):
-    """Called for each element of a pipeline.
-
-    Args:
-      context: a DoFnProcessContext object containing, among other
-        attributes, the element to be processed.
-        See the DoFnProcessContext documentation for details.
-      *args: side inputs
-      **kwargs: keyword side inputs
-    """
-    raise NotImplementedError
-
-  @staticmethod
-  def from_callable(fn):
-    return CallableWrapperDoFn(fn)
-
-  def process_argspec_fn(self):
-    """Returns the Python callable that will eventually be invoked.
-
-    This should ideally be the user-level function that is called with
-    the main and (if any) side inputs, and is used to relate the type
-    hint parameters with the input parameters (e.g., by argument name).
-    """
-    return self.process
-
-  def _strip_output_annotations(self, type_hint):
-    annotations = (TimestampedValue, WindowedValue, pvalue.SideOutputValue)
-    # TODO(robertwb): These should be parameterized types that the
-    # type inferencer understands.
-    if (type_hint in annotations
-        or trivial_inference.element_type(type_hint) in annotations):
-      return Any
-    else:
-      return type_hint
-
-
 def _fn_takes_side_inputs(fn):
   try:
     argspec = inspect.getargspec(fn)
@@ -679,7 +596,7 @@ class ParDo(PTransformWithSideInputs):
   def __init__(self, fn_or_label, *args, **kwargs):
     super(ParDo, self).__init__(fn_or_label, *args, **kwargs)
 
-    if not isinstance(self.fn, (OldDoFn, DoFn)):
+    if not isinstance(self.fn, DoFn):
       raise TypeError('ParDo must be called with a DoFn instance.')
 
   def default_type_hints(self):
@@ -690,7 +607,7 @@ class ParDo(PTransformWithSideInputs):
         self.fn.infer_output_type(input_type))
 
   def make_fn(self, fn):
-    if isinstance(fn, (OldDoFn, DoFn)):
+    if isinstance(fn, DoFn):
       return fn
     return CallableWrapperDoFn(fn)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/137d392e/sdks/python/apache_beam/typehints/typecheck.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py
index bab5bb0..defa71e 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -24,7 +24,6 @@ import types
 
 from apache_beam.pvalue import SideOutputValue
 from apache_beam.transforms.core import DoFn
-from apache_beam.transforms.core import OldDoFn
 from apache_beam.transforms.window import WindowedValue
 from apache_beam.typehints import check_constraint
 from apache_beam.typehints import CompositeTypeHintError
@@ -35,138 +34,6 @@ from apache_beam.typehints.decorators import _check_instance_type
 from apache_beam.typehints.decorators import getcallargs_forhints
 
 
-# TODO(Sourabh): Remove after migration to DoFn
-class TypeCheckWrapperOldDoFn(OldDoFn):
-  """A wrapper around a DoFn which performs type-checking of input and output.
-  """
-
-  def __init__(self, dofn, type_hints, label=None):
-    super(TypeCheckWrapperDoFn, self).__init__()
-    self._dofn = dofn
-    self._label = label
-    self._process_fn = self._dofn.process_argspec_fn()
-    if type_hints.input_types:
-      input_args, input_kwargs = type_hints.input_types
-      self._input_hints = getcallargs_forhints(
-          self._process_fn, *input_args, **input_kwargs)
-    else:
-      self._input_hints = None
-    # TODO(robertwb): Actually extract this.
-    self.context_var = 'context'
-    # TODO(robertwb): Multi-output.
-    self._output_type_hint = type_hints.simple_output_type(label)
-
-  def start_bundle(self, context):
-    return self._type_check_result(
-        self._dofn.start_bundle(context))
-
-  def finish_bundle(self, context):
-    return self._type_check_result(
-        self._dofn.finish_bundle(context))
-
-  def process(self, context, *args, **kwargs):
-    if self._input_hints:
-      actual_inputs = inspect.getcallargs(
-          self._process_fn, context.element, *args, **kwargs)
-      for var, hint in self._input_hints.items():
-        if hint is actual_inputs[var]:
-          # self parameter
-          continue
-        var_name = var + '.element' if var == self.context_var else var
-        _check_instance_type(hint, actual_inputs[var], var_name, True)
-    return self._type_check_result(self._dofn.process(context, *args, **kwargs))
-
-  def _type_check_result(self, transform_results):
-    if self._output_type_hint is None or transform_results is None:
-      return transform_results
-
-    def type_check_output(o):
-      # TODO(robertwb): Multi-output.
-      x = o.value if isinstance(o, (SideOutputValue, WindowedValue)) else o
-      self._type_check(self._output_type_hint, x, is_input=False)
-
-    # If the return type is a generator, then we will need to interleave our
-    # type-checking with its normal iteration so we don't deplete the
-    # generator initially just by type-checking its yielded contents.
-    if isinstance(transform_results, types.GeneratorType):
-      return GeneratorWrapper(transform_results, type_check_output)
-    else:
-      for o in transform_results:
-        type_check_output(o)
-      return transform_results
-
-  def _type_check(self, type_constraint, datum, is_input):
-    """Typecheck a PTransform related datum according to a type constraint.
-
-    This function is used to optionally type-check either an input or an output
-    to a PTransform.
-
-    Args:
-        type_constraint: An instance of a typehints.TypeContraint, one of the
-          white-listed builtin Python types, or a custom user class.
-        datum: An instance of a Python object.
-        is_input: True if 'datum' is an input to a PTransform's DoFn. False
-          otherwise.
-
-    Raises:
-      TypeError: If 'datum' fails to type-check according to 'type_constraint'.
-    """
-    datum_type = 'input' if is_input else 'output'
-
-    try:
-      check_constraint(type_constraint, datum)
-    except CompositeTypeHintError as e:
-      raise TypeCheckError, e.message, sys.exc_info()[2]
-    except SimpleTypeHintError:
-      error_msg = ("According to type-hint expected %s should be of type %s. "
-                   "Instead, received '%s', an instance of type %s."
-                   % (datum_type, type_constraint, datum, type(datum)))
-      raise TypeCheckError, error_msg, sys.exc_info()[2]
-
-
-# TODO(Sourabh): Remove after migration to DoFn
-class OutputCheckWrapperOldDoFn(OldDoFn):
-  """A DoFn that verifies against common errors in the output type."""
-
-  def __init__(self, dofn, full_label):
-    self.dofn = dofn
-    self.full_label = full_label
-
-  def run(self, method, context, args, kwargs):
-    try:
-      result = method(context, *args, **kwargs)
-    except TypeCheckError as e:
-      error_msg = ('Runtime type violation detected within ParDo(%s): '
-                   '%s' % (self.full_label, e))
-      raise TypeCheckError, error_msg, sys.exc_info()[2]
-    else:
-      return self._check_type(result)
-
-  def start_bundle(self, context):
-    return self.run(self.dofn.start_bundle, context, [], {})
-
-  def finish_bundle(self, context):
-    return self.run(self.dofn.finish_bundle, context, [], {})
-
-  def process(self, context, *args, **kwargs):
-    return self.run(self.dofn.process, context, args, kwargs)
-
-  def _check_type(self, output):
-    if output is None:
-      return output
-    elif isinstance(output, (dict, basestring)):
-      object_type = type(output).__name__
-      raise TypeCheckError('Returning a %s from a ParDo or FlatMap is '
-                           'discouraged. Please use list("%s") if you really '
-                           'want this behavior.' %
-                           (object_type, output))
-    elif not isinstance(output, collections.Iterable):
-      raise TypeCheckError('FlatMap and ParDo must return an '
-                           'iterable. %s was returned instead.'
-                           % type(output))
-    return output
-
-
 class AbstractDoFnWrapper(DoFn):
   """An abstract class to create wrapper around DoFn"""
 


Mime
View raw message