airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-2293] Fix S3FileTransformOperator to work with boto3
Date Thu, 12 Apr 2018 07:28:24 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 1fa71f5a5 -> 5cb530b45


[AIRFLOW-2293] Fix S3FileTransformOperator to work with boto3

S3FileTransformOperator doen't work for now since
it uses a function
which is no longer supported by boto3. This PR
replaces it with a
valid function and adds an unit test for this
operator.

Closes #3200 from sekikn/AIRFLOW-2293


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5cb530b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5cb530b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5cb530b4

Branch: refs/heads/master
Commit: 5cb530b455be54e6b58eae19c8c10ef8f5cf955d
Parents: 1fa71f5
Author: Kengo Seki <sekikn@apache.org>
Authored: Thu Apr 12 09:28:18 2018 +0200
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Thu Apr 12 09:28:18 2018 +0200

----------------------------------------------------------------------
 airflow/operators/s3_file_transform_operator.py |  6 +-
 setup.py                                        |  2 +-
 .../test_s3_file_transform_operator.py          | 67 ++++++++++++++++++++
 3 files changed, 70 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5cb530b4/airflow/operators/s3_file_transform_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py
index a27a782..f5dbc38 100644
--- a/airflow/operators/s3_file_transform_operator.py
+++ b/airflow/operators/s3_file_transform_operator.py
@@ -78,14 +78,13 @@ class S3FileTransformOperator(BaseOperator):
         if not source_s3.check_for_key(self.source_s3_key):
             raise AirflowException("The source key {0} does not exist".format(self.source_s3_key))
         source_s3_key_object = source_s3.get_key(self.source_s3_key)
-        with NamedTemporaryFile("w") as f_source, NamedTemporaryFile("w") as f_dest:
+        with NamedTemporaryFile("wb") as f_source, NamedTemporaryFile("wb") as f_dest:
             self.log.info(
                 "Dumping S3 file %s contents to local file %s",
                 self.source_s3_key, f_source.name
             )
-            source_s3_key_object.get_contents_to_file(f_source)
+            source_s3_key_object.download_fileobj(Fileobj=f_source)
             f_source.flush()
-            source_s3.connection.close()
             transform_script_process = subprocess.Popen(
                 [self.transform_script, f_source.name, f_dest.name],
                 stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
@@ -106,4 +105,3 @@ class S3FileTransformOperator(BaseOperator):
                 replace=self.replace
             )
             self.log.info("Upload successful")
-            dest_s3.connection.close()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5cb530b4/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 0c0b237..71a9113 100644
--- a/setup.py
+++ b/setup.py
@@ -146,7 +146,7 @@ oracle = ['cx_Oracle>=5.1.2']
 postgres = ['psycopg2-binary>=2.7.4']
 ssh = ['paramiko>=2.1.1', 'pysftp>=0.2.9']
 salesforce = ['simple-salesforce>=0.72']
-s3 = ['boto3>=1.0.0']
+s3 = ['boto3>=1.4.0']
 samba = ['pysmbclient>=0.1.3']
 slack = ['slackclient>=1.0.0']
 statsd = ['statsd>=3.0.1, <4.0']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5cb530b4/tests/operators/test_s3_file_transform_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/test_s3_file_transform_operator.py b/tests/operators/test_s3_file_transform_operator.py
new file mode 100644
index 0000000..7e480d5
--- /dev/null
+++ b/tests/operators/test_s3_file_transform_operator.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import errno
+import io
+import os
+import shutil
+import unittest
+from tempfile import mkdtemp
+
+import boto3
+import mock
+from moto import mock_s3
+
+from airflow.operators.s3_file_transform_operator import S3FileTransformOperator
+
+
+class TestS3FileTransformOperator(unittest.TestCase):
+
+    def setUp(self):
+        self.tmp_dir = mkdtemp(prefix='test_tmpS3FileTransform_')
+        self.transform_script = os.path.join(self.tmp_dir, "transform.py")
+        os.mknod(self.transform_script)
+
+    def tearDown(self):
+        try:
+            shutil.rmtree(self.tmp_dir)
+        except OSError as e:
+            # ENOENT - no such file or directory
+            if e.errno != errno.ENOENT:
+                raise e
+
+    @mock.patch('subprocess.Popen')
+    @mock_s3
+    def test_execute(self, mock_Popen):
+        transform_script_process = mock_Popen.return_value
+        transform_script_process.communicate.return_value = [None, None]
+        transform_script_process.returncode = 0
+
+        bucket = "bucket"
+        input_key = "foo"
+        output_key = "bar"
+        bio = io.BytesIO(b"input")
+
+        conn = boto3.client('s3')
+        conn.create_bucket(Bucket=bucket)
+        conn.upload_fileobj(Bucket=bucket, Key=input_key, Fileobj=bio)
+
+        s3_url = "s3://{0}/{1}"
+        t = S3FileTransformOperator(
+            source_s3_key=s3_url.format(bucket, input_key),
+            dest_s3_key=s3_url.format(bucket, output_key),
+            transform_script=self.transform_script,
+            task_id="task_id")
+        t.execute(None)


Mime
View raw message