Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8EDC9200D62 for ; Sat, 2 Dec 2017 00:52:03 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8D516160C18; Fri, 1 Dec 2017 23:52:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 85DBB160C06 for ; Sat, 2 Dec 2017 00:52:02 +0100 (CET) Received: (qmail 93318 invoked by uid 500); 1 Dec 2017 23:52:01 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 93309 invoked by uid 99); 1 Dec 2017 23:52:01 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Dec 2017 23:52:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id EDAF7C43F5 for ; Fri, 1 Dec 2017 23:52:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.231 X-Spam-Level: X-Spam-Status: No, score=-4.231 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id UnkN6nWmHWk9 for ; Fri, 1 Dec 2017 23:51:58 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 5833C5F1BA for ; Fri, 1 Dec 2017 23:51:58 +0000 (UTC) Received: (qmail 93282 invoked by uid 99); 1 Dec 2017 23:51:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Dec 2017 23:51:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CD1FAF6024; Fri, 1 Dec 2017 23:51:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: criccomini@apache.org To: commits@airflow.incubator.apache.org Message-Id: <1e8aca12772d46598b94a793c813970a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-airflow git commit: [AIRFLOW-1855][AIRFLOW-1866] Add GCS Copy Operator to copy multiple files Date: Fri, 1 Dec 2017 23:51:57 +0000 (UTC) archived-at: Fri, 01 Dec 2017 23:52:03 -0000 Repository: incubator-airflow Updated Branches: refs/heads/master b9c82c040 -> 3e321790d [AIRFLOW-1855][AIRFLOW-1866] Add GCS Copy Operator to copy multiple files Closes #2819 from kaxil/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3e321790 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3e321790 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3e321790 Branch: refs/heads/master Commit: 3e321790d537696b8fd1a97dcac2ab7c469fecea Parents: b9c82c0 Author: Kaxil Naik Authored: Fri Dec 1 15:51:22 2017 -0800 Committer: Chris Riccomini Committed: Fri Dec 1 15:51:30 2017 -0800 ---------------------------------------------------------------------- airflow/contrib/hooks/gcs_hook.py | 34 +++--- airflow/contrib/operators/gcs_copy_operator.py | 115 ++++++++++++++++++++ 2 files changed, 135 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3e321790/airflow/contrib/hooks/gcs_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index 546e028..f6ad39f 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -38,7 +38,6 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): http_authorized = self._authorize() return build('storage', 'v1', http=http_authorized) - # pylint:disable=redefined-builtin def copy(self, source_bucket, source_object, destination_bucket=None, destination_object=None): @@ -48,10 +47,10 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): destination_bucket or destination_object can be omitted, in which case source bucket/object is used, but not both. - :param bucket: The bucket of the object to copy from. - :type bucket: string - :param object: The object to copy. - :type object: string + :param source_bucket: The bucket of the object to copy from. + :type source_bucket: string + :param source_object: The object to copy. + :type source_object: string :param destination_bucket: The destination of the object to copied to. Can be omitted; then the same bucket is used. :type destination_bucket: string @@ -219,7 +218,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): return False raise - def list(self, bucket, versions=None, maxResults=None, prefix=None): + def list(self, bucket, versions=None, maxResults=None, prefix=None, delimiter=None): """ List all objects from the bucket with the give string prefix in name @@ -231,6 +230,8 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): :type maxResults: integer :param prefix: prefix string which filters objects whose name begin with this prefix :type prefix: string + :param delimiter: filters objects based on the delimiter (for e.g '.csv') + :type delimiter:string :return: a stream of object names matching the filtering criteria """ service = self.get_conn() @@ -243,16 +244,21 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): versions=versions, maxResults=maxResults, pageToken=pageToken, - prefix=prefix + prefix=prefix, + delimiter=delimiter ).execute() - if 'items' not in response: - self.log.info("No items found for prefix: %s", prefix) - break - - for item in response['items']: - if item and 'name' in item: - ids.append(item['name']) + if 'prefixes' not in response: + if 'items' not in response: + self.log.info("No items found for prefix: %s", prefix) + break + + for item in response['items']: + if item and 'name' in item: + ids.append(item['name']) + else: + for item in response['prefixes']: + ids.append(item) if 'nextPageToken' not in response: # no further pages of results, so stop the loop http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3e321790/airflow/contrib/operators/gcs_copy_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_copy_operator.py b/airflow/contrib/operators/gcs_copy_operator.py new file mode 100644 index 0000000..55d98a3 --- /dev/null +++ b/airflow/contrib/operators/gcs_copy_operator.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class GoogleCloudStorageCopyOperator(BaseOperator): + """ + Copies objects (optionally from a directory) filtered by 'delimiter' (file extension for e.g .json) from a bucket + to another bucket in a different directory, if required. + + :param source_bucket: The source Google cloud storage bucket where the object is. + :type source_bucket: string + :param source_object: The source name of the object to copy in the Google cloud + storage bucket. + :type source_object: string + :param source_files_delimiter: The delimiter by which you want to filter the files to copy. + For e.g to copy the CSV files from in a directory in GCS you would use source_files_delimiter='.csv'. + :type source_files_delimiter: string + :param destination_bucket: The destination Google cloud storage bucket where the object should be. + :type destination_bucket: string + :param destination_directory: The destination name of the directory in the destination Google cloud + storage bucket. + :type destination_directory: string + :param google_cloud_storage_conn_id: The connection ID to use when + connecting to Google cloud storage. + :type google_cloud_storage_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide delegation enabled. + :type delegate_to: string + + Example: The following Operator would move all the CSV files from `sales/sales-2017` folder in `data` bucket to + `sales` folder in `archive` bucket. + + move_file = GoogleCloudStorageCopyOperator( + task_id='move_file', + source_bucket='data', + source_object='sales/sales-2017/', + source_files_delimiter='.csv' + destination_bucket='archive', + destination_directory='sales', + google_cloud_storage_conn_id='airflow-service-account' + ) + """ + template_fields = ('source_bucket', 'source_object', 'source_files_delimiter', + 'destination_bucket', 'destination_directory') + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, + source_bucket, + source_object, + source_files_delimiter=None, + destination_bucket=None, + destination_directory='', + google_cloud_storage_conn_id='google_cloud_storage_default', + delegate_to=None, + *args, + **kwargs): + super(GoogleCloudStorageCopyOperator, self).__init__(*args, **kwargs) + self.source_bucket = source_bucket + self.source_object = source_object + self.source_files_delimiter = source_files_delimiter + self.files_to_copy = list() + self.destination_bucket = destination_bucket + self.destination_directory = destination_directory + self.google_cloud_storage_conn_id = google_cloud_storage_conn_id + self.delegate_to = delegate_to + + def execute(self, context): + + self.log.info('Executing copy - Source_Bucket: %s, Source_directory: %s, ' + 'Destination_bucket: %s, Destination_directory: %s', + self.source_bucket, self.source_object, + self.destination_bucket or self.source_bucket, + self.destination_directory or self.source_object) + + hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, + delegate_to=self.delegate_to) + + self.log.info('Getting list of the files to copy. Source Bucket: %s; Source Object: %s', + self.source_bucket, self.source_object) + + # Create a list of objects to copy from Source bucket. The function uses prefix keyword to pass the name of + # the object to copy. + self.files_to_copy = hook.list(bucket=self.source_bucket, prefix=self.source_object, + delimiter=self.source_files_delimiter) + + # Log the names of all objects to be copied + self.log.info('Files to copy: %s', self.files_to_copy) + + if self.files_to_copy is not None: + for file_to_copy in self.files_to_copy: + self.log.info('Source_Bucket: %s, Source_Object: %s, ' + 'Destination_bucket: %s, Destination_Directory: %s', + self.source_bucket, file_to_copy, + self.destination_bucket or self.source_bucket, + self.destination_directory + file_to_copy) + hook.copy(self.source_bucket, file_to_copy, + self.destination_bucket, self.destination_directory + file_to_copy) + else: + self.log.info('No Files to copy.')