airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-2299] Add S3 Select functionarity to S3FileTransformOperator
Date Tue, 17 Apr 2018 08:53:12 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master a14804310 -> 6e82f1d7c


[AIRFLOW-2299] Add S3 Select functionarity to S3FileTransformOperator

Currently, S3FileTransformOperator downloads the
whole file from S3
before transforming and uploading it. Adding
extraction feature using
S3 Select to this operator improves its efficiency
and usablitily.

Closes #3227 from sekikn/AIRFLOW-2299


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6e82f1d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6e82f1d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6e82f1d7

Branch: refs/heads/master
Commit: 6e82f1d7c9fa391c636a0155cdb19aa6cbda0821
Parents: a148043
Author: Kengo Seki <sekikn@apache.org>
Authored: Tue Apr 17 10:53:05 2018 +0200
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Tue Apr 17 10:53:05 2018 +0200

----------------------------------------------------------------------
 airflow/hooks/S3_hook.py                        | 40 +++++++++++++
 airflow/operators/s3_file_transform_operator.py | 59 ++++++++++++++------
 setup.py                                        |  2 +-
 tests/hooks/test_s3_hook.py                     |  8 +++
 .../test_s3_file_transform_operator.py          | 30 +++++++++-
 5 files changed, 121 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e82f1d7/airflow/hooks/S3_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py
index f75f5e6..7a4b8b0 100644
--- a/airflow/hooks/S3_hook.py
+++ b/airflow/hooks/S3_hook.py
@@ -177,6 +177,46 @@ class S3Hook(AwsHook):
         obj = self.get_key(key, bucket_name)
         return obj.get()['Body'].read().decode('utf-8')
 
+    def select_key(self, key, bucket_name=None,
+                   expression='SELECT * FROM S3Object',
+                   expression_type='SQL',
+                   input_serialization={'CSV': {}},
+                   output_serialization={'CSV': {}}):
+        """
+        Reads a key with S3 Select.
+
+        :param key: S3 key that will point to the file
+        :type key: str
+        :param bucket_name: Name of the bucket in which the file is stored
+        :type bucket_name: str
+        :param expression: S3 Select expression
+        :type expression: str
+        :param expression_type: S3 Select expression type
+        :type expression_type: str
+        :param input_serialization: S3 Select input data serialization format
+        :type input_serialization: str
+        :param output_serialization: S3 Select output data serialization format
+        :type output_serialization: str
+
+        .. seealso::
+            For more details about S3 Select parameters:
+            http://boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.Client.select_object_content
+        """
+        if not bucket_name:
+            (bucket_name, key) = self.parse_s3_url(key)
+
+        response = self.get_conn().select_object_content(
+            Bucket=bucket_name,
+            Key=key,
+            Expression=expression,
+            ExpressionType=expression_type,
+            InputSerialization=input_serialization,
+            OutputSerialization=output_serialization)
+
+        return ''.join(event['Records']['Payload']
+                       for event in response['Payload']
+                       if 'Records' in event)
+
     def check_for_wildcard_key(self,
                                wildcard_key, bucket_name=None, delimiter=''):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e82f1d7/airflow/operators/s3_file_transform_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py
index 1d39ace..67286b0 100644
--- a/airflow/operators/s3_file_transform_operator.py
+++ b/airflow/operators/s3_file_transform_operator.py
@@ -36,10 +36,13 @@ class S3FileTransformOperator(BaseOperator):
     The locations of the source and the destination files in the local
     filesystem is provided as an first and second arguments to the
     transformation script. The transformation script is expected to read the
-    data from source , transform it and write the output to the local
+    data from source, transform it and write the output to the local
     destination file. The operator then takes over control and uploads the
     local destination file to S3.
 
+    S3 Select is also available to filter the source contents. Users can
+    omit the transformation script if S3 Select expression is specified.
+
     :param source_s3_key: The key to be retrieved from S3
     :type source_s3_key: str
     :param source_aws_conn_id: source s3 connection
@@ -52,6 +55,8 @@ class S3FileTransformOperator(BaseOperator):
     :type replace: bool
     :param transform_script: location of the executable transformation script
     :type transform_script: str
+    :param select_expression: S3 Select expression
+    :type select_expression: str
     """
 
     template_fields = ('source_s3_key', 'dest_s3_key')
@@ -63,7 +68,8 @@ class S3FileTransformOperator(BaseOperator):
             self,
             source_s3_key,
             dest_s3_key,
-            transform_script,
+            transform_script=None,
+            select_expression=None,
             source_aws_conn_id='aws_default',
             dest_aws_conn_id='aws_default',
             replace=False,
@@ -75,33 +81,54 @@ class S3FileTransformOperator(BaseOperator):
         self.dest_aws_conn_id = dest_aws_conn_id
         self.replace = replace
         self.transform_script = transform_script
+        self.select_expression = select_expression
 
     def execute(self, context):
+        if self.transform_script is None and self.select_expression is None:
+            raise AirflowException(
+                "Either transform_script or select_expression must be specified")
+
         source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id)
         dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id)
+
         self.log.info("Downloading source S3 file %s", self.source_s3_key)
         if not source_s3.check_for_key(self.source_s3_key):
-            raise AirflowException("The source key {0} does not exist".format(self.source_s3_key))
+            raise AirflowException(
+                "The source key {0} does not exist".format(self.source_s3_key))
         source_s3_key_object = source_s3.get_key(self.source_s3_key)
+
         with NamedTemporaryFile("wb") as f_source, NamedTemporaryFile("wb") as f_dest:
             self.log.info(
                 "Dumping S3 file %s contents to local file %s",
                 self.source_s3_key, f_source.name
             )
-            source_s3_key_object.download_fileobj(Fileobj=f_source)
-            f_source.flush()
-            transform_script_process = subprocess.Popen(
-                [self.transform_script, f_source.name, f_dest.name],
-                stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
-            (transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate()
-            self.log.info("Transform script stdout %s", transform_script_stdoutdata)
-            if transform_script_process.returncode > 0:
-                raise AirflowException("Transform script failed %s", transform_script_stderrdata)
-            else:
-                self.log.info(
-                    "Transform script successful. Output temporarily located at %s",
-                    f_dest.name
+
+            if self.select_expression is not None:
+                content = source_s3.select_key(
+                    key=self.source_s3_key,
+                    expression=self.select_expression
                 )
+                f_source.write(content.encode("utf-8"))
+            else:
+                source_s3_key_object.download_fileobj(Fileobj=f_source)
+            f_source.flush()
+
+            if self.transform_script is not None:
+                transform_script_process = subprocess.Popen(
+                    [self.transform_script, f_source.name, f_dest.name],
+                    stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
+                (transform_script_stdoutdata, transform_script_stderrdata) = \
+                    transform_script_process.communicate()
+                self.log.info("Transform script stdout %s", transform_script_stdoutdata)
+                if transform_script_process.returncode > 0:
+                    raise AirflowException(
+                        "Transform script failed %s", transform_script_stderrdata)
+                else:
+                    self.log.info(
+                        "Transform script successful. Output temporarily located at %s",
+                        f_dest.name
+                    )
+
             self.log.info("Uploading transformed file to S3")
             f_dest.flush()
             dest_s3.load_file(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e82f1d7/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 8e7b939..700f3ae 100644
--- a/setup.py
+++ b/setup.py
@@ -155,7 +155,7 @@ oracle = ['cx_Oracle>=5.1.2']
 postgres = ['psycopg2-binary>=2.7.4']
 ssh = ['paramiko>=2.1.1', 'pysftp>=0.2.9']
 salesforce = ['simple-salesforce>=0.72']
-s3 = ['boto3>=1.4.0']
+s3 = ['boto3>=1.7.0']
 samba = ['pysmbclient>=0.1.3']
 slack = ['slackclient>=1.0.0']
 statsd = ['statsd>=3.0.1, <4.0']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e82f1d7/tests/hooks/test_s3_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_s3_hook.py b/tests/hooks/test_s3_hook.py
index e3e0eb5..94d0e36 100644
--- a/tests/hooks/test_s3_hook.py
+++ b/tests/hooks/test_s3_hook.py
@@ -18,6 +18,7 @@
 # under the License.
 #
 
+import mock
 import unittest
 
 from airflow import configuration
@@ -162,6 +163,13 @@ class TestS3Hook(unittest.TestCase):
 
         self.assertEqual(hook.read_key('my_key', 'mybucket'), u'Contént')
 
+    # As of 1.3.2, Moto doesn't support select_object_content yet.
+    @mock.patch('airflow.contrib.hooks.aws_hook.AwsHook.get_client_type')
+    def test_select_key(self, mock_get_client_type):
+        mock_get_client_type.return_value.select_object_content.return_value = \
+            {'Payload': [{'Records': {'Payload': u'Contént'}}]}
+        hook = S3Hook(aws_conn_id=None)
+        self.assertEqual(hook.select_key('my_key', 'mybucket'), u'Contént')
 
     @mock_s3
     def test_check_for_wildcard_key(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e82f1d7/tests/operators/test_s3_file_transform_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/test_s3_file_transform_operator.py b/tests/operators/test_s3_file_transform_operator.py
index 8b1ec86..6b72c58 100644
--- a/tests/operators/test_s3_file_transform_operator.py
+++ b/tests/operators/test_s3_file_transform_operator.py
@@ -49,7 +49,7 @@ class TestS3FileTransformOperator(unittest.TestCase):
 
     @mock.patch('subprocess.Popen')
     @mock_s3
-    def test_execute(self, mock_Popen):
+    def test_execute_with_transform_script(self, mock_Popen):
         transform_script_process = mock_Popen.return_value
         transform_script_process.communicate.return_value = [None, None]
         transform_script_process.returncode = 0
@@ -68,5 +68,33 @@ class TestS3FileTransformOperator(unittest.TestCase):
             source_s3_key=s3_url.format(bucket, input_key),
             dest_s3_key=s3_url.format(bucket, output_key),
             transform_script=self.transform_script,
+            replace=True,
             task_id="task_id")
         t.execute(None)
+
+    @mock.patch('airflow.hooks.S3_hook.S3Hook.select_key', return_value="input")
+    @mock_s3
+    def test_execute_with_select_expression(self, mock_select_key):
+        bucket = "bucket"
+        input_key = "foo"
+        output_key = "bar"
+        bio = io.BytesIO(b"input")
+
+        conn = boto3.client('s3')
+        conn.create_bucket(Bucket=bucket)
+        conn.upload_fileobj(Bucket=bucket, Key=input_key, Fileobj=bio)
+
+        s3_url = "s3://{0}/{1}"
+        select_expression = "SELECT * FROM S3Object s"
+        t = S3FileTransformOperator(
+            source_s3_key=s3_url.format(bucket, input_key),
+            dest_s3_key=s3_url.format(bucket, output_key),
+            select_expression=select_expression,
+            replace=True,
+            task_id="task_id")
+        t.execute(None)
+
+        mock_select_key.assert_called_once_with(
+            key=s3_url.format(bucket, input_key),
+            expression=select_expression
+        )


Mime
View raw message