airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-2932) GoogleCloudStorageHook - allow compression of file
Date Sat, 13 Oct 2018 06:03:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-2932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16648765#comment-16648765
] 

ASF GitHub Bot commented on AIRFLOW-2932:
-----------------------------------------

Fokko closed pull request #3893: [AIRFLOW-2932] GoogleCloudStorageHook - allow compression
of file
URL: https://github.com/apache/incubator-airflow/pull/3893
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index 6cfa1cf565..ca4b2a7754 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -24,7 +24,10 @@
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 from airflow.exceptions import AirflowException
 
+import gzip as gz
+import shutil
 import re
+import os
 
 
 class GoogleCloudStorageHook(GoogleCloudBaseHook):
@@ -172,7 +175,8 @@ def download(self, bucket, object, filename=None):
         return downloaded_file_bytes
 
     # pylint:disable=redefined-builtin
-    def upload(self, bucket, object, filename, mime_type='application/octet-stream'):
+    def upload(self, bucket, object, filename,
+               mime_type='application/octet-stream', gzip=False):
         """
         Uploads a local file to Google Cloud Storage.
 
@@ -184,14 +188,30 @@ def upload(self, bucket, object, filename, mime_type='application/octet-stream')
         :type filename: str
         :param mime_type: The MIME type to set when uploading the file.
         :type mime_type: str
+        :param gzip: Option to compress file for upload
+        :type gzip: bool
         """
         service = self.get_conn()
+
+        if gzip:
+            filename_gz = filename + '.gz'
+
+            with open(filename, 'rb') as f_in:
+                with gz.open(filename_gz, 'wb') as f_out:
+                    shutil.copyfileobj(f_in, f_out)
+                    filename = filename_gz
+
         media = MediaFileUpload(filename, mime_type)
+
         try:
             service \
                 .objects() \
                 .insert(bucket=bucket, name=object, media_body=media) \
                 .execute()
+
+            # Clean up gzip file
+            if gzip:
+                os.remove(filename)
             return True
         except errors.HttpError as ex:
             if ex.resp['status'] == '404':
diff --git a/airflow/contrib/operators/file_to_gcs.py b/airflow/contrib/operators/file_to_gcs.py
index a392a16891..de40abd473 100644
--- a/airflow/contrib/operators/file_to_gcs.py
+++ b/airflow/contrib/operators/file_to_gcs.py
@@ -25,7 +25,8 @@
 
 class FileToGoogleCloudStorageOperator(BaseOperator):
     """
-    Uploads a file to Google Cloud Storage
+    Uploads a file to Google Cloud Storage.
+    Optionally can compress the file for upload.
 
     :param src: Path to the local file. (templated)
     :type src: str
@@ -39,6 +40,8 @@ class FileToGoogleCloudStorageOperator(BaseOperator):
     :type mime_type: str
     :param delegate_to: The account to impersonate, if any
     :type delegate_to: str
+    :param gzip: Allows for file to be compressed and uploaded as gzip
+    :type gzip: bool
     """
     template_fields = ('src', 'dst', 'bucket')
 
@@ -50,6 +53,7 @@ def __init__(self,
                  google_cloud_storage_conn_id='google_cloud_default',
                  mime_type='application/octet-stream',
                  delegate_to=None,
+                 gzip=False,
                  *args,
                  **kwargs):
         super(FileToGoogleCloudStorageOperator, self).__init__(*args, **kwargs)
@@ -59,6 +63,7 @@ def __init__(self,
         self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
         self.mime_type = mime_type
         self.delegate_to = delegate_to
+        self.gzip = gzip
 
     def execute(self, context):
         """
@@ -72,4 +77,6 @@ def execute(self, context):
             bucket=self.bucket,
             object=self.dst,
             mime_type=self.mime_type,
-            filename=self.src)
+            filename=self.src,
+            gzip=self.gzip,
+        )
diff --git a/tests/contrib/operators/test_file_to_gcs.py b/tests/contrib/operators/test_file_to_gcs.py
new file mode 100644
index 0000000000..590267a319
--- /dev/null
+++ b/tests/contrib/operators/test_file_to_gcs.py
@@ -0,0 +1,66 @@
+import datetime
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class TestFileToGcsOperator(unittest.TestCase):
+
+    _config = {
+        'src': '/tmp/fake.csv',
+        'dst': 'fake.csv',
+        'bucket': 'dummy',
+        'mime_type': 'application/octet-stream',
+        'gzip': False
+    }
+
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': datetime.datetime(2017, 1, 1)
+        }
+        self.dag = DAG('test_dag_id', default_args=args)
+
+    def test_init(self):
+        operator = FileToGoogleCloudStorageOperator(
+            task_id='file_to_gcs_operator',
+            dag=self.dag,
+            **self._config
+        )
+        self.assertEqual(operator.src, self._config['src'])
+        self.assertEqual(operator.dst, self._config['dst'])
+        self.assertEqual(operator.bucket, self._config['bucket'])
+        self.assertEqual(operator.mime_type, self._config['mime_type'])
+        self.assertEqual(operator.gzip, self._config['gzip'])
+
+    @mock.patch('airflow.contrib.operators.file_to_gcs.GoogleCloudStorageHook',
+                autospec=True)
+    def test_execute(self, mock_hook):
+        mock_instance = mock_hook.return_value
+        operator = FileToGoogleCloudStorageOperator(
+            task_id='gcs_to_file_sensor',
+            dag=self.dag,
+            **self._config
+        )
+        operator.execute(None)
+        mock_instance.upload.assert_called_once_with(
+            bucket=self._config['bucket'],
+            filename=self._config['src'],
+            gzip=self._config['gzip'],
+            mime_type=self._config['mime_type'],
+            object=self._config['dst']
+        )
+
+
+if __name__ == '__main__':
+    unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> GoogleCloudStorageHook - allow compression of file
> --------------------------------------------------
>
>                 Key: AIRFLOW-2932
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2932
>             Project: Apache Airflow
>          Issue Type: Improvement
>    Affects Versions: 1.10.0
>            Reporter: jack
>            Assignee: neil90
>            Priority: Major
>
> The  *upload*{color:#555555}({color}_bucket_{color:#555555}, {color}_object_{color:#555555}, {color}_filename_{color:#555555}, {color}_mime_type='application/octet-stream'_{color:#555555}){color}
function allows to upload file from local disk.
> The google cloud support GZIP and BigQuery can read GZIP files. So, most people upload
compressed files in order to save space.
> It would be nice if the upload function would be able to make the compression on it's
own (if asked by the user). This will save the trouble of having to compress the file by ourselves.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message