From commits-return-64950-archive-asf-public=cust-asf.ponee.io@airflow.apache.org Thu Sep 5 11:23:28 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 5528A180656 for ; Thu, 5 Sep 2019 13:23:28 +0200 (CEST) Received: (qmail 26484 invoked by uid 500); 5 Sep 2019 19:19:42 -0000 Mailing-List: contact commits-help@airflow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.apache.org Delivered-To: mailing list commits@airflow.apache.org Received: (qmail 26475 invoked by uid 99); 5 Sep 2019 19:19:42 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Sep 2019 19:19:42 +0000 From: GitBox To: commits@airflow.apache.org Subject: [GitHub] [airflow] nuclearpinguin commented on a change in pull request #6011: [AIRFLOW-2842] Add GoogleCloudStorageSynchronizeBuckets operator Message-ID: <156768260763.23522.9542032841086013883.gitbox@gitbox.apache.org> Date: Thu, 05 Sep 2019 11:23:27 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit nuclearpinguin commented on a change in pull request #6011: [AIRFLOW-2842] Add GoogleCloudStorageSynchronizeBuckets operator URL: https://github.com/apache/airflow/pull/6011#discussion_r321205821 ########## File path: airflow/contrib/hooks/gcs_hook.py ########## @@ -564,6 +564,167 @@ def compose(self, bucket_name, source_objects, destination_object): self.log.info("Completed successfully.") + def sync( + self, + source_bucket: str, + destination_bucket: str, + source_object: Optional[str] = None, + destination_object: Optional[str] = None, + recursive: bool = True, + allow_overwrite: bool = False, + delete_extra_files: bool = False + ): + """ + Synchronizes the contents of the buckets. + + Parameters ``source_object`` and ``destination_object`` describe the root sync directory. If they are + not passed, the entire bucket will be synchronized. They should point to directories. + + .. note:: + The synchronization of individual files is not supported. Only entire directories can be + synchronized. + + :param source_bucket: The name of the bucket containing the source objects. + :type source_bucket: str + :param destination_bucket: The name of the bucket containing the destination objects. + :type destination_bucket: str + :param source_object: The root sync directory in the source bucket. + :type source_object: Optional[str] + :param destination_object: The root sync directory in the destination bucket. + :type destination_object: Optional[str] + :param recursive: If True, subdirectories will be considered + :type recursive: bool + :param recursive: If True, subdirectories will be considered + :type recursive: bool + :param allow_overwrite: if True, the files will be overwritten if a mismatched file is found. + By default, overwriting files is not allowed + :type allow_overwrite: bool + :param delete_extra_files: if True, deletes additional files from the source that not found in the + destination. By default extra files are not deleted. + + .. note:: + This option can delete data quickly if you specify the wrong source/destination combination. + + :type delete_extra_files: bool + :return: none + """ + client = self.get_conn() + # Create bucket object + source_bucket_obj = client.bucket(source_bucket) + destination_bucket_obj = client.bucket(destination_bucket) + # Normalize parameters when they are passed + source_object = ( + source_object + "/" if source_object and not source_object.endswith("/") else source_object + ) + destination_object = ( + destination_object + "/" + if destination_object and not destination_object.endswith("/") + else destination_object + ) + # Calculate the number of characters that remove from the name, because they contain information + # about the parent's path + source_object_prefix_len = len(source_object) if source_object else 0 + # Prepare synchronization plan + to_copy_blobs, to_delete_blobs, to_rewrite_blobs = self._prepare_sync_plan( + source_bucket=source_bucket_obj, + destination_bucket=destination_bucket_obj, + source_object=source_object, + destination_object=destination_object, + recursive=recursive + ) + self.log.info( + "Planned synchronization. To delete blobs count: %s, to upload blobs count: %s, " + "to rewrite blobs count: %s", + len(to_delete_blobs), + len(to_copy_blobs), + len(to_rewrite_blobs), + ) + + # Copy missing object to new bucket + if to_copy_blobs: + for blob in to_copy_blobs: + dst_object = ( + path.join(destination_object, blob.name[source_object_prefix_len:]) + if destination_object + else blob.name[source_object_prefix_len:] + ) + self.copy( + source_bucket=source_bucket_obj.name, + source_object=blob.name, + destination_bucket=destination_bucket_obj.name, + destination_object=dst_object, + ) + self.log.info("Blobs copied.") + # Delete redundant files + if to_delete_blobs: + if delete_extra_files: + # TODO: Add batch. I tried to do it, but the Google library is not stable at the moment. + for blob in to_delete_blobs: + self.delete(blob.bucket.name, blob.name) + self.log.info("Blobs deleted.") + else: + self.log.info("Skipped blobs deleting") + + # Overwrite files that are different + if to_rewrite_blobs: + if allow_overwrite: Review comment: Oh I see, but still I think it this should be a binary information: rewriting / skipping. Because otherwise user see only 2 messages that describes 3 cases. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services