beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/2] incubator-beam git commit: Close threadpools when finished with them
Date Wed, 19 Oct 2016 06:40:52 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 8e1793caf -> 1260bf779


Close threadpools when finished with them

This avoids building up an arbitrary number of dangling threads,
which can cause issues in testing and is undesirable in production.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/152a828d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/152a828d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/152a828d

Branch: refs/heads/python-sdk
Commit: 152a828dc3b45b88b4b00416012cc11f3b25db00
Parents: 8e1793c
Author: Robert Bradshaw <robertwb@google.com>
Authored: Tue Oct 18 17:08:18 2016 -0700
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Tue Oct 18 23:40:15 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/152a828d/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index e067833..931628c 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -122,11 +122,17 @@ class FileBasedSource(iobase.BoundedSource):
   @staticmethod
   def _estimate_sizes_in_parallel(file_names):
 
-    def _calculate_size_of_file(file_name):
-      return fileio.ChannelFactory.size_in_bytes(file_name)
-
-    return ThreadPool(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION).map(
-        _calculate_size_of_file, file_names)
+    if not file_names:
+      return []
+    elif len(file_names) == 1:
+      return [fileio.ChannelFactory.size_in_bytes(file_names[0])]
+    else:
+      pool = ThreadPool(
+          min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names)))
+      try:
+        return pool.map(fileio.ChannelFactory.size_in_bytes, file_names)
+      finally:
+        pool.terminate()
 
   def split(
       self, desired_bundle_size=None, start_position=None, stop_position=None):


Mime
View raw message