Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 374CC200C78 for ; Thu, 13 Apr 2017 00:12:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 35E76160B95; Wed, 12 Apr 2017 22:12:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 84A24160BA8 for ; Thu, 13 Apr 2017 00:12:36 +0200 (CEST) Received: (qmail 98417 invoked by uid 500); 12 Apr 2017 22:12:35 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 98289 invoked by uid 99); 12 Apr 2017 22:12:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Apr 2017 22:12:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6EC3BE9622; Wed, 12 Apr 2017 22:12:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chamikara@apache.org To: commits@beam.apache.org Date: Wed, 12 Apr 2017 22:12:35 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: [BEAM-1222] Chunk size should be FS dependent archived-at: Wed, 12 Apr 2017 22:12:37 -0000 Repository: beam Updated Branches: refs/heads/master 94c9e3817 -> 8479094c2 [BEAM-1222] Chunk size should be FS dependent Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f3fd354 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f3fd354 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f3fd354 Branch: refs/heads/master Commit: 6f3fd3545c793eb9cdb2dbd75a4a6e680f1cb7ec Parents: 94c9e38 Author: Sourabh Bajaj Authored: Mon Apr 10 14:09:48 2017 -0700 Committer: Chamikara Jayalath Committed: Wed Apr 12 15:11:55 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 10 +++++----- sdks/python/apache_beam/io/filesystem.py | 1 + sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 4 +++- 3 files changed, 9 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6f3fd354/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index f33942a..b128dc5 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -31,7 +31,6 @@ from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystems_util import get_filesystem from apache_beam.transforms.display import DisplayDataItem -MAX_BATCH_OPERATION_SIZE = 100 DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN' @@ -244,6 +243,7 @@ class FileSink(iobase.Sink): source_files = [] destination_files = [] + chunk_size = self._file_system.CHUNK_SIZE for shard_num, shard in enumerate(writer_results): final_name = ''.join([ self.file_path_prefix, self.shard_name_format % dict( @@ -252,12 +252,12 @@ class FileSink(iobase.Sink): source_files.append(shard) destination_files.append(final_name) - source_file_batch = [source_files[i:i + MAX_BATCH_OPERATION_SIZE] + source_file_batch = [source_files[i:i + chunk_size] for i in xrange(0, len(source_files), - MAX_BATCH_OPERATION_SIZE)] - destination_file_batch = [destination_files[i:i + MAX_BATCH_OPERATION_SIZE] + chunk_size)] + destination_file_batch = [destination_files[i:i + chunk_size] for i in xrange(0, len(destination_files), - MAX_BATCH_OPERATION_SIZE)] + chunk_size)] logging.info( 'Starting finalize_write threads with num_shards: %d, ' http://git-wip-us.apache.org/repos/asf/beam/blob/6f3fd354/sdks/python/apache_beam/io/filesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 85c7f06..3a71ac1 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -414,6 +414,7 @@ class FileSystem(object): the correct file system based on the provided file pattern scheme. """ __metaclass__ = abc.ABCMeta + CHUNK_SIZE = 1 # Chuck size in the batch operations @staticmethod def _get_compression_type(path, compression_type): http://git-wip-us.apache.org/repos/asf/beam/blob/6f3fd354/sdks/python/apache_beam/io/gcp/gcsfilesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index d79630f..b2bc809 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -31,6 +31,8 @@ class GCSFileSystem(FileSystem): """A GCS ``FileSystem`` implementation for accessing files on GCS. """ + CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations + def mkdirs(self, path): """Recursively create directories for the provided path. @@ -174,7 +176,7 @@ class GCSFileSystem(FileSystem): gcs_current_batch = [] for src, dest in zip(source_file_names, destination_file_names): gcs_current_batch.append((src, dest)) - if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE: + if len(gcs_current_batch) == self.CHUNK_SIZE: gcs_batches.append(gcs_current_batch) gcs_current_batch = [] if gcs_current_batch: