airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject [1/2] incubator-airflow git commit: [AIRFLOW-125] Add file to GCS operator
Date Wed, 18 May 2016 22:49:33 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master af43db5af -> 20a536cc7


[AIRFLOW-125] Add file to GCS operator

Adds an operator to upload a file to Google Cloud Storage. Used as follows:
```py
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
gcs = FileToGoogleCloudStorageOperator(
        bucket='a-bucket-i-have-access-to-on-gcs',
        dag=dag,
        task_id='upload_stuff',
        google_cloud_storage_conn_id='an-airflow-bigquery-connection',
        src=os.path.join(os.path.dirname(__file__), 'csv/some_file.csv'),
        dst='project/some_file.csv')
```


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4b25a7d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4b25a7d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4b25a7d3

Branch: refs/heads/master
Commit: 4b25a7d34ea17c86c4b40a09f85898ea4769b22b
Parents: abc43c1
Author: Eric Stern <eric@ericstern.com>
Authored: Tue May 17 11:19:42 2016 -0700
Committer: Eric Stern <eric@ericstern.com>
Committed: Wed May 18 15:32:57 2016 -0700

----------------------------------------------------------------------
 airflow/contrib/operators/file_to_gcs.py | 69 +++++++++++++++++++++++++++
 1 file changed, 69 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4b25a7d3/airflow/contrib/operators/file_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/file_to_gcs.py b/airflow/contrib/operators/file_to_gcs.py
new file mode 100644
index 0000000..2024db8
--- /dev/null
+++ b/airflow/contrib/operators/file_to_gcs.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
+from airflow.models import BaseOperator
+import os
+
+
+class FileToGoogleCloudStorageOperator(BaseOperator):
+    """
+    Uploads a file to Google Cloud Storage
+    """
+
+    def __init__(self,
+                 src,
+                 dst,
+                 bucket,
+                 google_cloud_storage_conn_id='google_cloud_storage_default',
+                 mime_type='application/octet-stream',
+                 delegate_to=None,
+                 *args,
+                 **kwargs):
+        """
+        :param src: Path to the local file
+        :type src: string
+        :param dst: Destination path within the specified bucket
+        :type dst: string
+        :param bucket: The bucket to upload to
+        :type bucket: string
+        :param google_cloud_storage_conn_id: The Airflow connection ID to upload with
+        :type google_cloud_storage_conn_id: string
+        :param mime_type: The mime-type string
+        :type mime_type: string
+        :param delegate_to: The account to impersonate, if any
+        :type delegate_to: string
+        """
+        super(FileToGoogleCloudStorageOperator, self).__init__(*args, **kwargs)
+        self.src = src
+        self.dst = dst
+        self.bucket = bucket
+        self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
+        self.mime_type = mime_type
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        """
+        Uploads the file to Google cloud storage
+        """
+        hook = GoogleCloudStorageHook(
+                google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+                delegate_to=self.delegate_to)
+
+        hook.upload(
+            bucket=self.bucket,
+            object=self.dst,
+            mime_type=self.mime_type,
+            filename=self.src)


Mime
View raw message