airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [airflow] nuclearpinguin commented on a change in pull request #6011: [AIRFLOW-2842] Add GoogleCloudStorageSynchronizeBuckets operator
Date Thu, 05 Sep 2019 11:23:27 GMT
nuclearpinguin commented on a change in pull request #6011: [AIRFLOW-2842] Add GoogleCloudStorageSynchronizeBuckets
operator
URL: https://github.com/apache/airflow/pull/6011#discussion_r321205821
 
 

 ##########
 File path: airflow/contrib/hooks/gcs_hook.py
 ##########
 @@ -564,6 +564,167 @@ def compose(self, bucket_name, source_objects, destination_object):
 
         self.log.info("Completed successfully.")
 
+    def sync(
+        self,
+        source_bucket: str,
+        destination_bucket: str,
+        source_object: Optional[str] = None,
+        destination_object: Optional[str] = None,
+        recursive: bool = True,
+        allow_overwrite: bool = False,
+        delete_extra_files: bool = False
+    ):
+        """
+        Synchronizes the contents of the buckets.
+
+        Parameters ``source_object`` and ``destination_object`` describe the root sync directory.
If they are
+        not passed, the entire bucket will be synchronized. They should point to directories.
+
+        .. note::
+            The synchronization of individual files is not supported. Only entire directories
can be
+            synchronized.
+
+        :param source_bucket: The name of the bucket containing the source objects.
+        :type source_bucket: str
+        :param destination_bucket: The name of the bucket containing the destination objects.
+        :type destination_bucket: str
+        :param source_object: The root sync directory in the source bucket.
+        :type source_object: Optional[str]
+        :param destination_object: The root sync directory in the destination bucket.
+        :type destination_object: Optional[str]
+        :param recursive: If True, subdirectories will be considered
+        :type recursive: bool
+        :param recursive: If True, subdirectories will be considered
+        :type recursive: bool
+        :param allow_overwrite: if True, the files will be overwritten if a mismatched file
is found.
+            By default, overwriting files is not allowed
+        :type allow_overwrite: bool
+        :param delete_extra_files: if True, deletes additional files from the source that
not found in the
+            destination. By default extra files are not deleted.
+
+            .. note::
+                This option can delete data quickly if you specify the wrong source/destination
combination.
+
+        :type delete_extra_files: bool
+        :return: none
+        """
+        client = self.get_conn()
+        # Create bucket object
+        source_bucket_obj = client.bucket(source_bucket)
+        destination_bucket_obj = client.bucket(destination_bucket)
+        # Normalize parameters when they are passed
+        source_object = (
+            source_object + "/" if source_object and not source_object.endswith("/") else
source_object
+        )
+        destination_object = (
+            destination_object + "/"
+            if destination_object and not destination_object.endswith("/")
+            else destination_object
+        )
+        # Calculate the number of characters that remove from the name, because they contain
information
+        # about the parent's path
+        source_object_prefix_len = len(source_object) if source_object else 0
+        # Prepare synchronization plan
+        to_copy_blobs, to_delete_blobs, to_rewrite_blobs = self._prepare_sync_plan(
+            source_bucket=source_bucket_obj,
+            destination_bucket=destination_bucket_obj,
+            source_object=source_object,
+            destination_object=destination_object,
+            recursive=recursive
+        )
+        self.log.info(
+            "Planned synchronization. To delete blobs count: %s, to upload blobs count: %s,
"
+            "to rewrite blobs count: %s",
+            len(to_delete_blobs),
+            len(to_copy_blobs),
+            len(to_rewrite_blobs),
+        )
+
+        # Copy missing object to new bucket
+        if to_copy_blobs:
+            for blob in to_copy_blobs:
+                dst_object = (
+                    path.join(destination_object, blob.name[source_object_prefix_len:])
+                    if destination_object
+                    else blob.name[source_object_prefix_len:]
+                )
+                self.copy(
+                    source_bucket=source_bucket_obj.name,
+                    source_object=blob.name,
+                    destination_bucket=destination_bucket_obj.name,
+                    destination_object=dst_object,
+                )
+            self.log.info("Blobs copied.")
+        # Delete redundant files
+        if to_delete_blobs:
+            if delete_extra_files:
+                # TODO: Add batch. I tried to do it, but the Google library is not stable
at the moment.
+                for blob in to_delete_blobs:
+                    self.delete(blob.bucket.name, blob.name)
+                self.log.info("Blobs deleted.")
+            else:
+                self.log.info("Skipped blobs deleting")
+
+        # Overwrite files that are different
+        if to_rewrite_blobs:
+            if allow_overwrite:
 
 Review comment:
   Oh I see, but still I think it this should be a binary information: rewriting / skipping.
Because otherwise user see only 2 messages that describes 3 cases.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message