beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [1/2] beam git commit: [BEAM-1222] Chunk size should be FS dependent
Date Wed, 12 Apr 2017 22:12:35 GMT
Repository: beam
Updated Branches:
  refs/heads/master 94c9e3817 -> 8479094c2


[BEAM-1222] Chunk size should be FS dependent


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f3fd354
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f3fd354
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f3fd354

Branch: refs/heads/master
Commit: 6f3fd3545c793eb9cdb2dbd75a4a6e680f1cb7ec
Parents: 94c9e38
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Mon Apr 10 14:09:48 2017 -0700
Committer: Chamikara Jayalath <chamikara@google.com>
Committed: Wed Apr 12 15:11:55 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py            | 10 +++++-----
 sdks/python/apache_beam/io/filesystem.py        |  1 +
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py |  4 +++-
 3 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6f3fd354/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index f33942a..b128dc5 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -31,7 +31,6 @@ from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.filesystems_util import get_filesystem
 from apache_beam.transforms.display import DisplayDataItem
 
-MAX_BATCH_OPERATION_SIZE = 100
 DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
 
 
@@ -244,6 +243,7 @@ class FileSink(iobase.Sink):
 
     source_files = []
     destination_files = []
+    chunk_size = self._file_system.CHUNK_SIZE
     for shard_num, shard in enumerate(writer_results):
       final_name = ''.join([
           self.file_path_prefix, self.shard_name_format % dict(
@@ -252,12 +252,12 @@ class FileSink(iobase.Sink):
       source_files.append(shard)
       destination_files.append(final_name)
 
-    source_file_batch = [source_files[i:i + MAX_BATCH_OPERATION_SIZE]
+    source_file_batch = [source_files[i:i + chunk_size]
                          for i in xrange(0, len(source_files),
-                                         MAX_BATCH_OPERATION_SIZE)]
-    destination_file_batch = [destination_files[i:i + MAX_BATCH_OPERATION_SIZE]
+                                         chunk_size)]
+    destination_file_batch = [destination_files[i:i + chunk_size]
                               for i in xrange(0, len(destination_files),
-                                              MAX_BATCH_OPERATION_SIZE)]
+                                              chunk_size)]
 
     logging.info(
         'Starting finalize_write threads with num_shards: %d, '

http://git-wip-us.apache.org/repos/asf/beam/blob/6f3fd354/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index 85c7f06..3a71ac1 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -414,6 +414,7 @@ class FileSystem(object):
   the correct file system based on the provided file pattern scheme.
   """
   __metaclass__ = abc.ABCMeta
+  CHUNK_SIZE = 1  # Chuck size in the batch operations
 
   @staticmethod
   def _get_compression_type(path, compression_type):

http://git-wip-us.apache.org/repos/asf/beam/blob/6f3fd354/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index d79630f..b2bc809 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -31,6 +31,8 @@ class GCSFileSystem(FileSystem):
   """A GCS ``FileSystem`` implementation for accessing files on GCS.
   """
 
+  CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE  # Chuck size in batch operations
+
   def mkdirs(self, path):
     """Recursively create directories for the provided path.
 
@@ -174,7 +176,7 @@ class GCSFileSystem(FileSystem):
     gcs_current_batch = []
     for src, dest in zip(source_file_names, destination_file_names):
       gcs_current_batch.append((src, dest))
-      if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE:
+      if len(gcs_current_batch) == self.CHUNK_SIZE:
         gcs_batches.append(gcs_current_batch)
         gcs_current_batch = []
     if gcs_current_batch:


Mime
View raw message