beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [4/4] incubator-beam git commit: Restore (faster) logging context.
Date Fri, 22 Jul 2016 00:36:35 GMT
Restore (faster) logging context.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ecf9e3a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ecf9e3a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ecf9e3a3

Branch: refs/heads/python-sdk
Commit: ecf9e3a3cc3dcbb3413403d4558c95d4a0097350
Parents: 7c9d77a
Author: Robert Bradshaw <robertwb@google.com>
Authored: Thu Jul 21 15:24:01 2016 -0700
Committer: Robert Bradshaw <robertwb@google.com>
Committed: Thu Jul 21 17:36:04 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/common.pxd |  8 +++++++-
 sdks/python/apache_beam/runners/common.py  | 20 ++++++++++++++------
 2 files changed, 21 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf9e3a3/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index f01a362..7191659 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -27,7 +27,7 @@ cdef class DoFnRunner(object):
   cdef object window_fn
   cdef object context   # TODO(robertwb): Make this a DoFnContext
   cdef object tagged_receivers
-  cdef object logger
+  cdef object logging_context  # TODO(robertwb): Make this a LoggingContext
   cdef object step_name
 
   cdef object main_receivers   # TODO(robertwb): Make this a Receiver
@@ -44,3 +44,9 @@ cdef class DoFnContext(object):
 
 cdef class Receiver(object):
   cdef receive(self, WindowedValue windowed_value)
+
+
+cdef class LoggingContext(object):
+  # TODO(robertwb): Optimize "with [cdef class]"
+  cpdef enter(self)
+  cpdef exit(self)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf9e3a3/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 80db823..a565645 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -29,14 +29,12 @@ from apache_beam.transforms.window import WindowFn
 from apache_beam.utils.windowed_value import WindowedValue
 
 
-class FakeLogger(object):
-  def PerThreadLoggingContext(self, *unused_args, **unused_kwargs):
-    return self
+class LoggingContext(object):
 
-  def __enter__(self):
+  def enter(self):
     pass
 
-  def __exit__(self, *unused_args):
+  def exit(self):
     pass
 
 
@@ -76,7 +74,8 @@ class DoFnRunner(object):
     self.window_fn = windowing.windowfn
     self.context = context
     self.tagged_receivers = tagged_receivers
-    self.logger = logger or FakeLogger()
+    self.logging_context = (logger.PerThreadLoggingContext(step_name=step_name)
+                            if logger else LoggingContext())
     self.step_name = step_name
 
     # Optimize for the common case.
@@ -85,23 +84,32 @@ class DoFnRunner(object):
   def start(self):
     self.context.set_element(None)
     try:
+      self.logging_context.enter()
       self._process_outputs(None, self.dofn.start_bundle(self.context))
     except BaseException as exn:
       self.reraise_augmented(exn)
+    finally:
+      self.logging_context.exit()
 
   def finish(self):
     self.context.set_element(None)
     try:
+      self.logging_context.enter()
       self._process_outputs(None, self.dofn.finish_bundle(self.context))
     except BaseException as exn:
       self.reraise_augmented(exn)
+    finally:
+      self.logging_context.exit()
 
   def process(self, element):
     try:
+      self.logging_context.enter()
       self.context.set_element(element)
       self._process_outputs(element, self.dofn_process(self.context))
     except BaseException as exn:
       self.reraise_augmented(exn)
+    finally:
+      self.logging_context.exit()
 
   def reraise_augmented(self, exn):
     if getattr(exn, '_tagged_with_step', False) or not self.step_name:


Mime
View raw message