Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 02672200ACA for ; Thu, 19 May 2016 00:49:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 00F47160A15; Wed, 18 May 2016 22:49:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 46516160A00 for ; Thu, 19 May 2016 00:49:37 +0200 (CEST) Received: (qmail 10368 invoked by uid 500); 18 May 2016 22:49:36 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 10359 invoked by uid 99); 18 May 2016 22:49:36 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 May 2016 22:49:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 163AD1A53B6 for ; Wed, 18 May 2016 22:49:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id jJIPDq2PY9Tu for ; Wed, 18 May 2016 22:49:34 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 091725F1B8 for ; Wed, 18 May 2016 22:49:33 +0000 (UTC) Received: (qmail 10303 invoked by uid 99); 18 May 2016 22:49:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 May 2016 22:49:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 28BD0DFB38; Wed, 18 May 2016 22:49:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: criccomini@apache.org To: commits@airflow.incubator.apache.org Date: Wed, 18 May 2016 22:49:33 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-airflow git commit: [AIRFLOW-125] Add file to GCS operator archived-at: Wed, 18 May 2016 22:49:38 -0000 Repository: incubator-airflow Updated Branches: refs/heads/master af43db5af -> 20a536cc7 [AIRFLOW-125] Add file to GCS operator Adds an operator to upload a file to Google Cloud Storage. Used as follows: ```py from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator gcs = FileToGoogleCloudStorageOperator( bucket='a-bucket-i-have-access-to-on-gcs', dag=dag, task_id='upload_stuff', google_cloud_storage_conn_id='an-airflow-bigquery-connection', src=os.path.join(os.path.dirname(__file__), 'csv/some_file.csv'), dst='project/some_file.csv') ``` Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4b25a7d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4b25a7d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4b25a7d3 Branch: refs/heads/master Commit: 4b25a7d34ea17c86c4b40a09f85898ea4769b22b Parents: abc43c1 Author: Eric Stern Authored: Tue May 17 11:19:42 2016 -0700 Committer: Eric Stern Committed: Wed May 18 15:32:57 2016 -0700 ---------------------------------------------------------------------- airflow/contrib/operators/file_to_gcs.py | 69 +++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4b25a7d3/airflow/contrib/operators/file_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/file_to_gcs.py b/airflow/contrib/operators/file_to_gcs.py new file mode 100644 index 0000000..2024db8 --- /dev/null +++ b/airflow/contrib/operators/file_to_gcs.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook +from airflow.models import BaseOperator +import os + + +class FileToGoogleCloudStorageOperator(BaseOperator): + """ + Uploads a file to Google Cloud Storage + """ + + def __init__(self, + src, + dst, + bucket, + google_cloud_storage_conn_id='google_cloud_storage_default', + mime_type='application/octet-stream', + delegate_to=None, + *args, + **kwargs): + """ + :param src: Path to the local file + :type src: string + :param dst: Destination path within the specified bucket + :type dst: string + :param bucket: The bucket to upload to + :type bucket: string + :param google_cloud_storage_conn_id: The Airflow connection ID to upload with + :type google_cloud_storage_conn_id: string + :param mime_type: The mime-type string + :type mime_type: string + :param delegate_to: The account to impersonate, if any + :type delegate_to: string + """ + super(FileToGoogleCloudStorageOperator, self).__init__(*args, **kwargs) + self.src = src + self.dst = dst + self.bucket = bucket + self.google_cloud_storage_conn_id = google_cloud_storage_conn_id + self.mime_type = mime_type + self.delegate_to = delegate_to + + def execute(self, context): + """ + Uploads the file to Google cloud storage + """ + hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, + delegate_to=self.delegate_to) + + hook.upload( + bucket=self.bucket, + object=self.dst, + mime_type=self.mime_type, + filename=self.src)