beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [1/2] beam git commit: Adding lull tracking for python sampler
Date Thu, 19 Oct 2017 23:02:35 GMT
Repository: beam
Updated Branches:
  refs/heads/master 41f16123b -> 49c392790


Adding lull tracking for python sampler


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21cdc85c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21cdc85c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21cdc85c

Branch: refs/heads/master
Commit: 21cdc85cfa8a06208a7f0a6736cc7d5886d4c8de
Parents: 41f1612
Author: Pablo <pabloem@google.com>
Authored: Thu Oct 19 12:50:46 2017 -0700
Committer: chamikara@google.com <chamikara@google.com>
Committed: Thu Oct 19 16:02:03 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/worker/statesampler.pyx  | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/21cdc85c/sdks/python/apache_beam/runners/worker/statesampler.pyx
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx b/sdks/python/apache_beam/runners/worker/statesampler.pyx
index f0527c6..1e37196 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.pyx
+++ b/sdks/python/apache_beam/runners/worker/statesampler.pyx
@@ -74,12 +74,16 @@ cdef inline int64_t get_nsec_time() nogil:
 class StateSamplerInfo(object):
   """Info for current state and transition statistics of StateSampler."""
 
-  def __init__(self, state_name, transition_count):
+  def __init__(self, state_name, transition_count, time_since_transition):
     self.state_name = state_name
     self.transition_count = transition_count
+    self.time_since_transition = time_since_transition
 
   def __repr__(self):
-    return '<StateSamplerInfo %s %d>' % (self.state_name, self.transition_count)
+    return ('<StateSamplerInfo state: %s time: %dns transitions: %d>'
+            % (self.state_name,
+               self.time_since_transition,
+               self.transition_count))
 
 
 # Default period for sampling current state of pipeline execution.
@@ -105,6 +109,7 @@ cdef class StateSampler(object):
   cdef pythread.PyThread_type_lock lock
 
   cdef public int64_t state_transition_count
+  cdef int64_t time_since_transition
 
   cdef int32_t current_state_index
 
@@ -122,6 +127,8 @@ cdef class StateSampler(object):
     self.scoped_states_by_name = {}
 
     self.current_state_index = 0
+    self.time_since_transition = 0
+    self.state_transition_count = 0
     unknown_state = ScopedState(self, 'unknown', self.current_state_index)
     pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
     self.scoped_states_by_index = [unknown_state]
@@ -142,6 +149,7 @@ cdef class StateSampler(object):
   def run(self):
     cdef int64_t last_nsecs = get_nsec_time()
     cdef int64_t elapsed_nsecs
+    cdef int64_t latest_transition_count = self.state_transition_count
     with nogil:
       while True:
         usleep(self.sampling_period_ms * 1000)
@@ -155,6 +163,10 @@ cdef class StateSampler(object):
           nsecs_ptr = &(<ScopedState>PyList_GET_ITEM(
               self.scoped_states_by_index, self.current_state_index)).nsecs
           nsecs_ptr[0] += elapsed_nsecs
+          if latest_transition_count != self.state_transition_count:
+            self.time_since_transition = 0
+            latest_transition_count = self.state_transition_count
+          self.time_since_transition += elapsed_nsecs
           last_nsecs += elapsed_nsecs
         finally:
           pythread.PyThread_release_lock(self.lock)
@@ -182,7 +194,8 @@ cdef class StateSampler(object):
     """Returns StateSamplerInfo with transition statistics."""
     return StateSamplerInfo(
         self.scoped_states_by_index[self.current_state_index].name,
-        self.state_transition_count)
+        self.state_transition_count,
+        self.time_since_transition)
 
   # TODO(pabloem): Make state_name required once all callers migrate,
   #   and the legacy path is removed.


Mime
View raw message