airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-2303] Lists the keys inside an S3 bucket
Date Tue, 10 Apr 2018 07:10:11 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master e2aeebea3 -> 687bd03df


[AIRFLOW-2303] Lists the keys inside an S3 bucket

Lists the keys matching a prefix and a delimiter inside an S3 bucket

Closes #3203 from wileeam/s3-list-operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/687bd03d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/687bd03d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/687bd03d

Branch: refs/heads/master
Commit: 687bd03df0d162a1a32623c86b711640ead37c88
Parents: e2aeebe
Author: Guillermo Rodriguez Cano <guillermo.rodriguezcano@bonnierbroadcasting.com>
Authored: Tue Apr 10 09:07:35 2018 +0200
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Tue Apr 10 09:07:39 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/operators/s3_list_operator.py   | 79 ++++++++++++++++++++
 docs/code.rst                                   |  1 +
 docs/integration.rst                            |  8 ++
 .../contrib/operators/test_s3_list_operator.py  | 51 +++++++++++++
 4 files changed, 139 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/687bd03d/airflow/contrib/operators/s3_list_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/s3_list_operator.py b/airflow/contrib/operators/s3_list_operator.py
new file mode 100644
index 0000000..9287515
--- /dev/null
+++ b/airflow/contrib/operators/s3_list_operator.py
@@ -0,0 +1,79 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.hooks.S3_hook import S3Hook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class S3ListOperator(BaseOperator):
+    """
+    List all objects from the bucket with the given string prefix and delimiter
+    in name.
+
+    This operator returns a python list with the name of objects which can be
+    used by `xcom` in the downstream task.
+
+    :param bucket: The S3 bucket where to find the objects.
+    :type bucket: string
+    :param prefix: Prefix string to filters the objects whose name begin with
+        such prefix
+    :type prefix: string
+    :param delimiter: The delimiter by which you want to filter the objects.
+        For e.g to lists the CSV files from in a directory in S3 you would use
+        delimiter='.csv'.
+    :type delimiter: string
+    :param aws_conn_id: The connection ID to use when connecting to S3 storage.
+    :type aws_conn_id: string
+
+    **Example**:
+        The following operator would list all the CSV files from the S3
+        ``customers/2018/04/`` key in the ``data`` bucket. ::
+
+            s3_file = S3ListOperator(
+                task_id='list_3s_files',
+                bucket='data',
+                prefix='customers/2018/04/',
+                delimiter='.csv',
+                aws_conn_id='aws_customers_conn'
+            )
+    """
+    template_fields = ('bucket', 'prefix', 'delimiter')
+    ui_color = '#ffd700'
+
+    @apply_defaults
+    def __init__(self,
+                 bucket,
+                 prefix='',
+                 delimiter='',
+                 aws_conn_id='aws_default',
+                 *args,
+                 **kwargs):
+        super(S3ListOperator, self).__init__(*args, **kwargs)
+        self.bucket = bucket
+        self.prefix = prefix
+        self.delimiter = delimiter
+        self.aws_conn_id = aws_conn_id
+
+    def execute(self, context):
+        hook = S3Hook(aws_conn_id=self.aws_conn_id)
+
+        self.log.info(
+            'Getting the list of files from bucket: {0} in prefix: {1} (Delimiter {2})'.
+            format(self.bucket, self.prefix, self.delimiter))
+
+        return hook.list_keys(
+            bucket_name=self.bucket,
+            prefix=self.prefix,
+            delimiter=self.delimiter)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/687bd03d/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index ebb8e0d..a6cb486 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -164,6 +164,7 @@ Operators
 .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubSubscriptionDeleteOperator
 .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubPublishOperator
 .. autoclass:: airflow.contrib.operators.qubole_operator.QuboleOperator
+.. autoclass:: airflow.contrib.operators.s3_list_operator.S3ListOperator
 .. autoclass:: airflow.contrib.operators.sftp_operator.SFTPOperator
 .. autoclass:: airflow.contrib.operators.spark_jdbc_operator.SparkJDBCOperator
 .. autoclass:: airflow.contrib.operators.spark_sql_operator.SparkSqlOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/687bd03d/docs/integration.rst
----------------------------------------------------------------------
diff --git a/docs/integration.rst b/docs/integration.rst
index 162b5d8..00d1ba9 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -196,10 +196,18 @@ EmrHook
 AWS S3
 '''''''
 
+- :ref:`S3ListOperator` : Lists the files matching a key prefix from a S3 location.
 - :ref:`S3FileTransformOperator` : Copies data from a source S3 location to a temporary location
on the local filesystem.
 - :ref:`S3ToHiveTransfer` : Moves data from S3 to Hive. The operator downloads a file from
S3, stores the file locally before loading it into a Hive table.
 - :ref:`S3Hook` : Interact with AWS S3.
 
+.. _S3ListOperator:
+
+S3ListOperator
+""""""""""""""
+
+.. autoclass:: airflow.contrib.operators.s3_list_operator.S3ListOperator
+
 .. _S3FileTransformOperator:
 
 S3FileTransformOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/687bd03d/tests/contrib/operators/test_s3_list_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_s3_list_operator.py b/tests/contrib/operators/test_s3_list_operator.py
new file mode 100644
index 0000000..e2e2b74
--- /dev/null
+++ b/tests/contrib/operators/test_s3_list_operator.py
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.contrib.operators.s3_list_operator import S3ListOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+TASK_ID = 'test-s3-list-operator'
+BUCKET = 'test-bucket'
+DELIMITER = '.csv'
+PREFIX = 'TEST'
+MOCK_FILES = ["TEST1.csv", "TEST2.csv", "TEST3.csv"]
+
+
+class S3ListOperatorTest(unittest.TestCase):
+    @mock.patch('airflow.contrib.operators.s3_list_operator.S3Hook')
+    def test_execute(self, mock_hook):
+
+        mock_hook.return_value.list_keys.return_value = MOCK_FILES
+
+        operator = S3ListOperator(
+            task_id=TASK_ID, bucket=BUCKET, prefix=PREFIX, delimiter=DELIMITER)
+
+        files = operator.execute(None)
+
+        mock_hook.return_value.list_keys.assert_called_once_with(
+            bucket_name=BUCKET, prefix=PREFIX, delimiter=DELIMITER)
+        self.assertEqual(sorted(files), sorted(MOCK_FILES))
+
+
+if __name__ == '__main__':
+    unittest.main()


Mime
View raw message