beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: To add sdks/python/utils/profiler a MemoryReporter that tracks heap profiles.
Date Tue, 07 Feb 2017 18:30:54 GMT
Repository: beam
Updated Branches:
  refs/heads/master 074031cac -> 63dc08ee1


To add sdks/python/utils/profiler a MemoryReporter that tracks heap profiles.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0a592053
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0a592053
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0a592053

Branch: refs/heads/master
Commit: 0a59205314d21f452b91848496d6ae21e369d7df
Parents: 074031c
Author: Younghee Kwon <younghee.kwon@gmail.com>
Authored: Mon Feb 6 12:35:50 2017 -0800
Committer: Ahmet Altay <altay@google.com>
Committed: Tue Feb 7 10:16:42 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/utils/profiler.py | 79 +++++++++++++++++++++++++-
 1 file changed, 78 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0a592053/sdks/python/apache_beam/utils/profiler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/profiler.py b/sdks/python/apache_beam/utils/profiler.py
index 1214172..3599f98 100644
--- a/sdks/python/apache_beam/utils/profiler.py
+++ b/sdks/python/apache_beam/utils/profiler.py
@@ -24,7 +24,8 @@ import pstats
 import StringIO
 import tempfile
 import time
-
+from threading import Timer
+import warnings
 
 from apache_beam.utils.dependency import _dependency_file_copy
 
@@ -67,3 +68,79 @@ class Profile(object):
           self.profile, stream=s).sort_stats(Profile.SORTBY)
       self.stats.print_stats()
       logging.info('Profiler data: [%s]', s.getvalue())
+
+
+class MemoryReporter(object):
+  """A memory reporter that reports the memory usage and heap profile.
+  Usage:
+    mr = MemoryReporter(interval_second=30.0)
+    mr.start()
+    while ...
+      <do something>
+      # this will report continuously with 30 seconds between reports.
+    mr.stop()
+
+  NOTE: A reporter with start() should always stop(), or the parent process can
+  never finish.
+
+  Or simply the following which does star() and stop():
+    with MemoryReporter(interval_second=100):
+      while ...
+        <do some thing>
+
+  Also it could report on demand without continuous reporting.
+    mr = MemoryReporter()  # default interval 60s but not started.
+    <do something>
+    mr.report_once()
+  """
+
+  def __init__(self, interval_second=60.0):
+    # guppy might not have installed. http://pypi.python.org/pypi/guppy/0.1.10
+    # The reporter can be set up only when guppy is installed (and guppy cannot
+    # be added to the required packages in setup.py, since it's not available
+    # in all platforms).
+    try:
+      from guppy import hpy  # pylint: disable=import-error
+      self._hpy = hpy
+      self._interval_second = interval_second
+      self._timer = None
+    except ImportError:
+      warnings.warn('guppy is not installed; MemoryReporter not available.')
+      self._hpy = None
+    self._enabled = False
+
+  def __enter__(self):
+    self.start()
+    return self
+
+  def __exit__(self, *args):
+    self.stop()
+
+  def start(self):
+    if self._enabled or not self._hpy:
+      return
+    self._enabled = True
+
+    def report_with_interval():
+      if not self._enabled:
+        return
+      self.report_once()
+      self._timer = Timer(self._interval_second, report_with_interval)
+      self._timer.start()
+
+    self._timer = Timer(self._interval_second, report_with_interval)
+    self._timer.start()
+
+  def stop(self):
+    if not self._enabled:
+      return
+    self._timer.cancel()
+    self._enabled = False
+
+  def report_once(self):
+    if not self._hpy:
+      return
+    report_start_time = time.time()
+    heap_profile = self._hpy().heap()
+    logging.info('*** MemoryReport Heap:\n %s\n MemoryReport took %.1f seconds',
+                 heap_profile, time.time() - report_start_time)


Mime
View raw message