beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Some performance improvements to NewDoFn
Date Thu, 02 Feb 2017 21:55:42 GMT
Repository: beam
Updated Branches:
  refs/heads/master 70b16c74d -> 1a6f2e8f6


Some performance improvements to NewDoFn


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/956b81cf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/956b81cf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/956b81cf

Branch: refs/heads/master
Commit: 956b81cfa18281366c2f6bc41b02b099ec37d210
Parents: 70b16c7
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Mon Jan 30 11:16:16 2017 -0800
Committer: Ahmet Altay <altay@google.com>
Committed: Thu Feb 2 13:36:58 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/common.pxd |   9 +-
 sdks/python/apache_beam/runners/common.py  | 157 +++++++++++++++---------
 2 files changed, 107 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/956b81cf/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index f41b313..dbb08f0 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -38,17 +38,22 @@ cdef class DoFnRunner(Receiver):
   cdef LoggingContext logging_context
   cdef object step_name
   cdef bint is_new_dofn
-  cdef object args
+  cdef list args
   cdef dict kwargs
-  cdef object side_inputs
   cdef ScopedMetricsContainer scoped_metrics_container
+  cdef list side_inputs
   cdef bint has_windowed_side_inputs
+  cdef list placeholders
+  cdef bint simple_process
 
   cdef Receiver main_receivers
 
   cpdef process(self, WindowedValue element)
   cdef old_dofn_process(self, WindowedValue element)
   cdef new_dofn_process(self, WindowedValue element)
+  cdef new_dofn_simple_process(self, WindowedValue element)
+  cdef _new_dofn_window_process(
+      self, WindowedValue element, list args, dict kwargs, object window)
 
   @cython.locals(windowed_value=WindowedValue)
   cpdef _process_outputs(self, WindowedValue element, results)

http://git-wip-us.apache.org/repos/asf/beam/blob/956b81cf/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index dbbd9ba..0089f34 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -25,9 +25,9 @@ from apache_beam.internal import util
 from apache_beam.metrics.execution import ScopedMetricsContainer
 from apache_beam.pvalue import SideOutputValue
 from apache_beam.transforms import core
-from apache_beam.transforms import window
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import GlobalWindow
 from apache_beam.utils.windowed_value import WindowedValue
 
 
@@ -94,7 +94,7 @@ class DoFnRunner(Receiver):
     self.scoped_metrics_container = (scoped_metrics_container
                                      or ScopedMetricsContainer())
 
-    global_window = window.GlobalWindow()
+    global_window = GlobalWindow()
 
     # Need to support multiple iterations.
     side_inputs = list(side_inputs)
@@ -117,6 +117,11 @@ class DoFnRunner(Receiver):
 
     # TODO(Sourabhbajaj): Remove the usage of OldDoFn
     if isinstance(fn, core.NewDoFn):
+
+      class ArgPlaceholder(object):
+        def __init__(self, placeholder):
+          self.placeholder = placeholder
+
       self.is_new_dofn = True
 
       # Stash values for use in new_dofn_process.
@@ -127,6 +132,70 @@ class DoFnRunner(Receiver):
       self.args = args if args else []
       self.kwargs = kwargs if kwargs else {}
       self.dofn = fn
+      self.dofn_process = fn.process
+
+      arguments, _, _, defaults = self.dofn.get_function_arguments('process')
+      defaults = defaults if defaults else []
+      self_in_args = int(self.dofn.is_process_bounded())
+
+      self.simple_process = (
+          not side_inputs and not args and not kwargs and not defaults)
+      if self.simple_process:
+        return
+
+      # TODO(Sourabhbajaj) Rename this variable once oldDoFn is deprecated
+      self.has_windowed_side_inputs = (
+          self.has_windowed_side_inputs or
+          core.NewDoFn.WindowParam in defaults)
+
+      # Try to prepare all the arguments that can just be filled in
+      # without any additional work. in the process function.
+      # Also cache all the placeholders needed in the process function.
+
+      # Fill in sideInputs if they are globally windowed
+      if not self.has_windowed_side_inputs:
+        self.args, self.kwargs = util.insert_values_in_args(
+            args, kwargs, [si[global_window] for si in side_inputs])
+
+      # Create placeholder for element parameter
+      if core.NewDoFn.ElementParam not in defaults:
+        args_to_pick = len(arguments) - len(defaults) - 1 - self_in_args
+        final_args = [ArgPlaceholder(core.NewDoFn.ElementParam)] + \
+                     self.args[:args_to_pick]
+      else:
+        args_to_pick = len(arguments) - len(defaults) - self_in_args
+        final_args = self.args[:args_to_pick]
+
+      # Fill the OtherPlaceholders for context, window or timestamp
+      args = iter(self.args[args_to_pick:])
+      for a, d in zip(arguments[-len(defaults):], defaults):
+        if d == core.NewDoFn.ElementParam:
+          final_args.append(ArgPlaceholder(d))
+        elif d == core.NewDoFn.ContextParam:
+          final_args.append(ArgPlaceholder(d))
+        elif d == core.NewDoFn.WindowParam:
+          final_args.append(ArgPlaceholder(d))
+        elif d == core.NewDoFn.TimestampParam:
+          final_args.append(ArgPlaceholder(d))
+        elif d == core.NewDoFn.SideInputParam:
+          # If no more args are present then the value must be passed via kwarg
+          try:
+            final_args.append(args.next())
+          except StopIteration:
+            if a not in self.kwargs:
+              raise ValueError("Value for sideinput %s not provided" % a)
+        else:
+          # If no more args are present then the value must be passed via kwarg
+          try:
+            final_args.append(args.next())
+          except StopIteration:
+            pass
+      final_args.extend(list(args))
+      self.args = final_args
+
+      # Stash the list of placeholder positions for performance
+      self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(self.args)
+                           if isinstance(x, ArgPlaceholder)]
 
     else:
       self.is_new_dofn = False
@@ -177,66 +246,37 @@ class DoFnRunner(Receiver):
       self.context.set_element(element)
       self._process_outputs(element, self.dofn_process(self.context))
 
+  def new_dofn_simple_process(self, element):
+    self._process_outputs(element, self.dofn_process(element.value))
+
+  def _new_dofn_window_process(self, element, args, kwargs, window):
+    # TODO(sourabhbajaj): Investigate why we can't use `is` instead of ==
+    for i, p in self.placeholders:
+      if p == core.NewDoFn.ElementParam:
+        args[i] = element.value
+      elif p == core.NewDoFn.ContextParam:
+        args[i] = self.context
+      elif p == core.NewDoFn.WindowParam:
+        args[i] = window
+      elif p == core.NewDoFn.TimestampParam:
+        args[i] = element.timestamp
+    if not kwargs:
+      self._process_outputs(element, self.dofn_process(*args))
+    else:
+      self._process_outputs(element, self.dofn_process(*args, **kwargs))
+
   def new_dofn_process(self, element):
     self.context.set_element(element)
-    arguments, _, _, defaults = self.dofn.get_function_arguments('process')
-    defaults = defaults if defaults else []
-
-    self_in_args = int(self.dofn.is_process_bounded())
-
     # Call for the process function for each window if has windowed side inputs
     # or if the process accesses the window parameter. We can just call it once
     # otherwise as none of the arguments are changing
-    if self.has_windowed_side_inputs or core.NewDoFn.WindowParam in defaults:
-      windows = element.windows
+    if self.has_windowed_side_inputs:
+      for w in element.windows:
+        args, kwargs = util.insert_values_in_args(
+            self.args, self.kwargs, [si[w] for si in self.side_inputs])
+        self._new_dofn_window_process(element, args, kwargs, w)
     else:
-      windows = [window.GlobalWindow()]
-
-    for w in windows:
-      args, kwargs = util.insert_values_in_args(
-          self.args, self.kwargs,
-          [s[w] for s in self.side_inputs])
-
-      # If there are more arguments than the default then the first argument
-      # should be the element and the rest should be picked from the side
-      # inputs as window and timestamp should always be tagged
-      if len(arguments) > len(defaults) + self_in_args:
-        if core.NewDoFn.ElementParam not in defaults:
-          args_to_pick = len(arguments) - len(defaults) - 1 - self_in_args
-          final_args = [element.value] + args[:args_to_pick]
-        else:
-          args_to_pick = len(arguments) - len(defaults) - self_in_args
-          final_args = args[:args_to_pick]
-      else:
-        args_to_pick = 0
-        final_args = []
-      args = iter(args[args_to_pick:])
-
-      for a, d in zip(arguments[-len(defaults):], defaults):
-        if d == core.NewDoFn.ElementParam:
-          final_args.append(element.value)
-        elif d == core.NewDoFn.ContextParam:
-          final_args.append(self.context)
-        elif d == core.NewDoFn.WindowParam:
-          final_args.append(w)
-        elif d == core.NewDoFn.TimestampParam:
-          final_args.append(element.timestamp)
-        elif d == core.NewDoFn.SideInputParam:
-          # If no more args are present then the value must be passed via kwarg
-          try:
-            final_args.append(args.next())
-          except StopIteration:
-            if a not in kwargs:
-              raise
-        else:
-          # If no more args are present then the value must be passed via kwarg
-          try:
-            final_args.append(args.next())
-          except StopIteration:
-            if a not in kwargs:
-              kwargs[a] = d
-      final_args.extend(list(args))
-      self._process_outputs(element, self.dofn.process(*final_args, **kwargs))
+      self._new_dofn_window_process(element, self.args, self.kwargs, None)
 
   def _invoke_bundle_method(self, method):
     try:
@@ -271,7 +311,10 @@ class DoFnRunner(Receiver):
       self.logging_context.enter()
       self.scoped_metrics_container.enter()
       if self.is_new_dofn:
-        self.new_dofn_process(element)
+        if self.simple_process:
+          self.new_dofn_simple_process(element)
+        else:
+          self.new_dofn_process(element)
       else:
         self.old_dofn_process(element)
     except BaseException as exn:


Mime
View raw message