airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-2300] Add S3 Select functionarity to S3ToHiveTransfer
Date Mon, 23 Apr 2018 06:57:28 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master a15b7c5b7 -> 49826af10


[AIRFLOW-2300] Add S3 Select functionarity to S3ToHiveTransfer

To improve efficiency and usability, this PR adds
S3 Select functionarity to S3ToHiveTransfer.
It also contains some minor fixes for documents
and comments.

Closes #3243 from sekikn/AIRFLOW-2300


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/49826af1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/49826af1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/49826af1

Branch: refs/heads/master
Commit: 49826af108d2e245ca921944296f24cc73120461
Parents: a15b7c5
Author: Kengo Seki <sekikn@apache.org>
Authored: Mon Apr 23 08:57:23 2018 +0200
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Mon Apr 23 08:57:23 2018 +0200

----------------------------------------------------------------------
 airflow/hooks/S3_hook.py                 |  6 ++-
 airflow/operators/s3_to_hive_operator.py | 34 ++++++++++++++-
 tests/operators/s3_to_hive_operator.py   | 61 ++++++++++++++++++++++++---
 3 files changed, 90 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/49826af1/airflow/hooks/S3_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py
index 7a4b8b0..edde6ea 100644
--- a/airflow/hooks/S3_hook.py
+++ b/airflow/hooks/S3_hook.py
@@ -194,9 +194,11 @@ class S3Hook(AwsHook):
         :param expression_type: S3 Select expression type
         :type expression_type: str
         :param input_serialization: S3 Select input data serialization format
-        :type input_serialization: str
+        :type input_serialization: dict
         :param output_serialization: S3 Select output data serialization format
-        :type output_serialization: str
+        :type output_serialization: dict
+        :return: retrieved subset of original data by S3 Select
+        :rtype: str
 
         .. seealso::
             For more details about S3 Select parameters:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/49826af1/airflow/operators/s3_to_hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py
index 129dd92..e9a979d 100644
--- a/airflow/operators/s3_to_hive_operator.py
+++ b/airflow/operators/s3_to_hive_operator.py
@@ -85,6 +85,8 @@ class S3ToHiveTransfer(BaseOperator):
     :type input_compressed: bool
     :param tblproperties: TBLPROPERTIES of the hive table being created
     :type tblproperties: dict
+    :param select_expression: S3 Select expression
+    :type select_expression: str
     """
 
     template_fields = ('s3_key', 'partition', 'hive_table')
@@ -108,6 +110,7 @@ class S3ToHiveTransfer(BaseOperator):
             hive_cli_conn_id='hive_cli_default',
             input_compressed=False,
             tblproperties=None,
+            select_expression=None,
             *args, **kwargs):
         super(S3ToHiveTransfer, self).__init__(*args, **kwargs)
         self.s3_key = s3_key
@@ -124,6 +127,7 @@ class S3ToHiveTransfer(BaseOperator):
         self.aws_conn_id = aws_conn_id
         self.input_compressed = input_compressed
         self.tblproperties = tblproperties
+        self.select_expression = select_expression
 
         if (self.check_headers and
                 not (self.field_dict is not None and self.headers)):
@@ -146,16 +150,42 @@ class S3ToHiveTransfer(BaseOperator):
                 raise AirflowException(
                     "The key {0} does not exists".format(self.s3_key))
             s3_key_object = self.s3.get_key(self.s3_key)
+
         root, file_ext = os.path.splitext(s3_key_object.key)
+        if (self.select_expression and self.input_compressed and
+                file_ext != '.gz'):
+            raise AirflowException("GZIP is the only compression " +
+                                   "format Amazon S3 Select supports")
+
         with TemporaryDirectory(prefix='tmps32hive_') as tmp_dir,\
                 NamedTemporaryFile(mode="wb",
                                    dir=tmp_dir,
                                    suffix=file_ext) as f:
             self.log.info("Dumping S3 key {0} contents to local file {1}"
                           .format(s3_key_object.key, f.name))
-            s3_key_object.download_fileobj(f)
+            if self.select_expression:
+                option = {}
+                if self.headers:
+                    option['FileHeaderInfo'] = 'USE'
+                if self.delimiter:
+                    option['FieldDelimiter'] = self.delimiter
+
+                input_serialization = {'CSV': option}
+                if self.input_compressed:
+                    input_serialization['CompressionType'] = 'GZIP'
+
+                content = self.s3.select_key(
+                    bucket_name=s3_key_object.bucket_name,
+                    key=s3_key_object.key,
+                    expression=self.select_expression,
+                    input_serialization=input_serialization
+                )
+                f.write(content.encode("utf-8"))
+            else:
+                s3_key_object.download_fileobj(f)
             f.flush()
-            if not self.headers:
+
+            if self.select_expression or not self.headers:
                 self.log.info("Loading file %s into Hive", f.name)
                 self.hive.load_file(
                     f.name,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/49826af1/tests/operators/s3_to_hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/s3_to_hive_operator.py b/tests/operators/s3_to_hive_operator.py
index cb8df85..482e7fe 100644
--- a/tests/operators/s3_to_hive_operator.py
+++ b/tests/operators/s3_to_hive_operator.py
@@ -227,7 +227,7 @@ class S3ToHiveTransferTest(unittest.TestCase):
                          format(ext,
                                 ('with' if has_header else 'without'))
                          )
-            self.kwargs['input_compressed'] = (False if ext == '.txt' else True)
+            self.kwargs['input_compressed'] = ext != '.txt'
             self.kwargs['s3_key'] = 's3://bucket/' + self.s3_key + ext
             ip_fn = self._get_fn(ext, self.kwargs['headers'])
             op_fn = self._get_fn(ext, False)
@@ -235,20 +235,67 @@ class S3ToHiveTransferTest(unittest.TestCase):
             # Upload the file into the Mocked S3 bucket
             conn.upload_file(ip_fn, 'bucket', self.s3_key + ext)
 
-            # file paramter to HiveCliHook.load_file is compared
-            # against expected file oputput
+            # file parameter to HiveCliHook.load_file is compared
+            # against expected file output
             mock_hiveclihook().load_file.side_effect = \
                 lambda *args, **kwargs: \
                 self.assertTrue(
-                    self._check_file_equality(args[0],
-                                              op_fn,
-                                              ext
-                                              ),
+                    self._check_file_equality(args[0], op_fn, ext),
                     msg='{0} output file not as expected'.format(ext))
             # Execute S3ToHiveTransfer
             s32hive = S3ToHiveTransfer(**self.kwargs)
             s32hive.execute(None)
 
+    @unittest.skipIf(mock is None, 'mock package not present')
+    @unittest.skipIf(mock_s3 is None, 'moto package not present')
+    @mock.patch('airflow.operators.s3_to_hive_operator.HiveCliHook')
+    @mock_s3
+    def test_execute_with_select_expression(self, mock_hiveclihook):
+        conn = boto3.client('s3')
+        conn.create_bucket(Bucket='bucket')
+
+        select_expression = "SELECT * FROM S3Object s"
+        bucket = 'bucket'
+
+        # Only testing S3ToHiveTransfer calls S3Hook.select_key with
+        # the right parameters and its execute method succeeds here,
+        # since Moto doesn't support select_object_content as of 1.3.2.
+        for (ext, has_header) in product(['.txt', '.gz'], [True, False]):
+            input_compressed = ext != '.txt'
+            key = self.s3_key + ext
+
+            self.kwargs['check_headers'] = False
+            self.kwargs['headers'] = has_header
+            self.kwargs['input_compressed'] = input_compressed
+            self.kwargs['select_expression'] = select_expression
+            self.kwargs['s3_key'] = 's3://{0}/{1}'.format(bucket, key)
+
+            ip_fn = self._get_fn(ext, has_header)
+
+            # Upload the file into the Mocked S3 bucket
+            conn.upload_file(ip_fn, bucket, key)
+
+            input_serialization = {
+                'CSV': {'FieldDelimiter': self.delimiter}
+            }
+            if input_compressed:
+                input_serialization['CompressionType'] = 'GZIP'
+            if has_header:
+                input_serialization['CSV']['FileHeaderInfo'] = 'USE'
+
+            # Confirm that select_key was called with the right params
+            with mock.patch('airflow.hooks.S3_hook.S3Hook.select_key',
+                            return_value="") as mock_select_key:
+                # Execute S3ToHiveTransfer
+                s32hive = S3ToHiveTransfer(**self.kwargs)
+                s32hive.execute(None)
+
+                mock_select_key.assert_called_once_with(
+                    bucket_name=bucket, key=key,
+                    expression=select_expression,
+                    input_serialization=input_serialization
+                )
+
 
 if __name__ == '__main__':
     unittest.main()


Mime
View raw message