airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From san...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-2420] Azure Data Lake Hook
Date Tue, 15 May 2018 17:31:00 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master e3bbc314e -> 7c233179e


[AIRFLOW-2420] Azure Data Lake Hook

Add AzureDataLakeHook as a first step to enable
Airflow connect to
Azure Data Lake.

The hook has a simple interface to upload and
download files with all
parameters available in Azure Data Lake sdk and
also a check_for_file
to query if a file exists in data lake.

[AIRFLOW-2420] Add functionality for Azure Data
Lake

Make sure you have checked _all_ steps below.

### JIRA
- [x] My PR addresses the following [Airflow JIRA]
(https://issues.apache.org/jira/browse/AIRFLOW-242
0) issues and references them in the PR title.
    -
https://issues.apache.org/jira/browse/AIRFLOW-2420

### Description
- [x] Here are some details about my PR, including
screenshots of any UI changes:
       This PR creates Azure Data Lake hook
(adl_hook.AdlHook) and all the setup required to
create a new Azure Data Lake connection.

### Tests
- [x] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
       Adds tests to airflow.hooks.adl_hook.py in
tests.hooks.test_adl_hook.py

### Commits
- [x] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not
"adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

### Documentation
- [x] In case of new functionality, my PR adds
documentation that describes how to use it.
    - When adding new operators/hooks/sensors, the
autoclass documentation generation needs to be
added.

### Code Quality
- [x] Passes `git diff upstream/master -u --
"*.py" | flake8 --diff`

Closes #3333 from marcusrehm/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7c233179
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7c233179
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7c233179

Branch: refs/heads/master
Commit: 7c233179e91818bd641b283934a73cc84a51ca03
Parents: e3bbc31
Author: Marcus Rehm <marcus.rehm@gmail.com>
Authored: Tue May 15 10:30:54 2018 -0700
Committer: r39132 <siddharthanand@yahoo.com>
Committed: Tue May 15 10:30:54 2018 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/__init__.py               |   3 +-
 airflow/contrib/hooks/azure_data_lake_hook.py   | 141 +++++++++++++++++++
 airflow/models.py                               |   4 +
 airflow/utils/db.py                             |   6 +-
 docs/integration.rst                            |  19 ++-
 setup.py                                        |  16 ++-
 .../contrib/hooks/test_azure_data_lake_hook.py  | 105 ++++++++++++++
 7 files changed, 286 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c233179/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py
index 81ce8f0..1c92deb 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -58,7 +58,8 @@ _hooks = {
     'wasb_hook': ['WasbHook'],
     'gcp_pubsub_hook': ['PubSubHook'],
     'jenkins_hook': ['JenkinsHook'],
-    'aws_dynamodb_hook': ['AwsDynamoDBHook']
+    'aws_dynamodb_hook': ['AwsDynamoDBHook'],
+    'azure_data_lake_hook': ['AzureDataLakeHook'],
 }
 
 import os as _os

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c233179/airflow/contrib/hooks/azure_data_lake_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/azure_data_lake_hook.py b/airflow/contrib/hooks/azure_data_lake_hook.py
new file mode 100644
index 0000000..1a02d78
--- /dev/null
+++ b/airflow/contrib/hooks/azure_data_lake_hook.py
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from airflow.hooks.base_hook import BaseHook
+from azure.datalake.store import core, lib, multithread
+
+
+class AzureDataLakeHook(BaseHook):
+    """
+    Interacts with Azure Data Lake.
+
+    Client ID and client secret should be in user and password parameters.
+    Tenant and account name should be extra field as
+    {"tenant": "<TENANT>", "account_name": "ACCOUNT_NAME"}.
+
+    :param azure_data_lake_conn_id: Reference to the Azure Data Lake connection.
+    :type azure_data_lake_conn_id: str
+    """
+
+    def __init__(self, azure_data_lake_conn_id='azure_data_lake_default'):
+        self.conn_id = azure_data_lake_conn_id
+        self.connection = self.get_conn()
+
+    def get_conn(self):
+        """Return a AzureDLFileSystem object."""
+        conn = self.get_connection(self.conn_id)
+        service_options = conn.extra_dejson
+        self.account_name = service_options.get('account_name')
+
+        adlCreds = lib.auth(tenant_id=service_options.get('tenant'),
+                            client_secret=conn.password,
+                            client_id=conn.login)
+        adlsFileSystemClient = core.AzureDLFileSystem(adlCreds,
+                                                      store_name=self.account_name)
+        adlsFileSystemClient.connect()
+        return adlsFileSystemClient
+
+    def check_for_file(self, file_path):
+        """
+        Check if a file exists on Azure Data Lake.
+
+        :param file_path: Path and name of the file.
+        :type file_path: str
+        :return: True if the file exists, False otherwise.
+        :rtype bool
+        """
+        try:
+            files = self.connection.glob(file_path, details=False, invalidate_cache=True)
+            return len(files) == 1
+        except FileNotFoundError:
+            return False
+
+    def upload_file(self, local_path, remote_path, nthreads=64, overwrite=True,
+                    buffersize=4194304, blocksize=4194304):
+        """
+        Upload a file to Azure Data Lake.
+
+        :param local_path: local path. Can be single file, directory (in which case,
+            upload recursively) or glob pattern. Recursive glob patterns using `**`
+            are not supported.
+        :type local_path: str
+        :param remote_path: Remote path to upload to; if multiple files, this is the
+            dircetory root to write within.
+        :type remote_path: str
+        :param nthreads: Number of threads to use. If None, uses the number of cores.
+        :type nthreads: int
+        :param overwrite: Whether to forcibly overwrite existing files/directories.
+            If False and remote path is a directory, will quit regardless if any files
+            would be overwritten or not. If True, only matching filenames are actually
+            overwritten.
+        :type overwrite: bool
+        :param buffersize: int [2**22]
+            Number of bytes for internal buffer. This block cannot be bigger than
+            a chunk and cannot be smaller than a block.
+        :type buffersize: int
+        :param blocksize: int [2**22]
+            Number of bytes for a block. Within each chunk, we write a smaller
+            block for each API call. This block cannot be bigger than a chunk.
+        :type blocksize: int
+        """
+        multithread.ADLUploader(self.connection,
+                                lpath=local_path,
+                                rpath=remote_path,
+                                nthreads=nthreads,
+                                overwrite=overwrite,
+                                buffersize=buffersize,
+                                blocksize=blocksize)
+
+    def download_file(self, local_path, remote_path, nthreads=64, overwrite=True,
+                      buffersize=4194304, blocksize=4194304):
+        """
+        Download a file from Azure Blob Storage.
+
+        :param local_path: local path. If downloading a single file, will write to this
+            specific file, unless it is an existing directory, in which case a file is
+            created within it. If downloading multiple files, this is the root
+            directory to write within. Will create directories as required.
+        :type local_path: str
+        :param remote_path: remote path/globstring to use to find remote files.
+            Recursive glob patterns using `**` are not supported.
+        :type remote_path: str
+        :param nthreads: Number of threads to use. If None, uses the number of cores.
+        :type nthreads: int
+        :param overwrite: Whether to forcibly overwrite existing files/directories.
+            If False and remote path is a directory, will quit regardless if any files
+            would be overwritten or not. If True, only matching filenames are actually
+            overwritten.
+        :type overwrite: bool
+        :param buffersize: int [2**22]
+            Number of bytes for internal buffer. This block cannot be bigger than
+            a chunk and cannot be smaller than a block.
+        :type buffersize: int
+        :param blocksize: int [2**22]
+            Number of bytes for a block. Within each chunk, we write a smaller
+            block for each API call. This block cannot be bigger than a chunk.
+        :type blocksize: int
+        """
+        multithread.ADLDownloader(self.connection,
+                                  lpath=local_path,
+                                  rpath=remote_path,
+                                  nthreads=nthreads,
+                                  overwrite=overwrite,
+                                  buffersize=buffersize,
+                                  blocksize=blocksize)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c233179/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index ec4d2bb..2322860 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -602,6 +602,7 @@ class Connection(Base, LoggingMixin):
         ('emr', 'Elastic MapReduce',),
         ('snowflake', 'Snowflake',),
         ('segment', 'Segment',),
+        ('azure_data_lake', 'Azure Data Lake'),
     ]
 
     def __init__(
@@ -747,6 +748,9 @@ class Connection(Base, LoggingMixin):
             elif self.conn_type == 'docker':
                 from airflow.hooks.docker_hook import DockerHook
                 return DockerHook(docker_conn_id=self.conn_id)
+            elif self.conn_type == 'azure_data_lake':
+                from airflow.contrib.hooks.azure_data_lake_hook import AzureDataLakeHook
+                return AzureDataLakeHook(azure_data_lake_conn_id=self.conn_id)
         except:
             pass
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c233179/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 7bbda93..adda6fd 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -271,7 +271,11 @@ def initdb(rbac=False):
     merge_conn(
         models.Connection(
             conn_id='segment_default', conn_type='segment',
-            extra='{"write_key": "my-segment-write-key"}'))
+            extra='{"write_key": "my-segment-write-key"}')),
+    merge_conn(
+        models.Connection(
+            conn_id='azure_data_lake_default', conn_type='azure_data_lake',
+            extra='{"tenant": "<TENANT>", "account_name": "<ACCOUNTNAME>" }'))
 
     # Known event types
     KET = models.KnownEventType

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c233179/docs/integration.rst
----------------------------------------------------------------------
diff --git a/docs/integration.rst b/docs/integration.rst
index 5e54eb1..d488737 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -70,7 +70,8 @@ Azure: Microsoft Azure
 ----------------------
 
 Airflow has limited support for Microsoft Azure: interfaces exist only for Azure Blob
-Storage. Note that the Hook, Sensor and Operator are in the contrib section.
+Storage and Azure Data Lake. Hook, Sensor and Operator for Blob Storage and 
+Azure Data Lake Hook are in contrib section.
 
 Azure Blob Storage
 ''''''''''''''''''
@@ -146,6 +147,22 @@ Follow the steps below to enable Azure Blob Storage logging.
 #. Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.
 #. Verify that logs are showing up for newly executed tasks in the bucket you've defined.
 
+Azure Data Lake
+''''''''''''''''''
+
+AzureDataLakeHook communicates via a REST API compatible with WebHDFS. Make sure that a
+Airflow connection of type `azure_data_lake` exists. Authorization can be done by supplying
a
+login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) and account_name
(Account Name)
+ (see connection `azure_data_lake_default` for an example).
+
+- :ref:`AzureDataLakeHook`: Interface with Azure Data Lake.
+
+.. _AzureDataLakeHook:
+
+AzureDataLakeHook
+"""""""""
+
+.. autoclass:: airflow.contrib.hooks.azure_data_lake_hook.AzureDataLakeHook
 
 .. _AWS:
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c233179/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 8907c00..9813ea2 100644
--- a/setup.py
+++ b/setup.py
@@ -108,7 +108,12 @@ async = [
     'gevent>=0.13'
 ]
 atlas = ['atlasclient>=0.1.2']
-azure = ['azure-storage>=0.34.0']
+azure_blob_storage = ['azure-storage>=0.34.0']
+azure_data_lake = [
+    'azure-mgmt-resource==1.2.2',
+    'azure-mgmt-datalake-store==0.4.0',
+    'azure-datalake-store==0.0.19'
+]
 sendgrid = ['sendgrid>=5.2.0']
 celery = [
     'celery>=4.0.2',
@@ -211,9 +216,9 @@ devel = [
 devel_minreq = devel + kubernetes + mysql + doc + password + s3 + cgroups
 devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos
 devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + oracle +
-             docker + ssh + kubernetes + celery + azure + redis + gcp_api + datadog +
-             zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins +
-             druid + pinot + segment + snowflake + elasticsearch + atlas)
+             docker + ssh + kubernetes + celery + azure_blob_storage + redis + gcp_api +
+             datadog + zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins +
+             druid + pinot + segment + snowflake + elasticsearch + azure_data_lake, atlas)
 
 # Snakebite & Google Cloud Dataflow are not Python 3 compatible :'(
 if PY3:
@@ -283,7 +288,8 @@ def do_setup():
             'all_dbs': all_dbs,
             'atlas': atlas,
             'async': async,
-            'azure': azure,
+            'azure_blob_storage': azure_blob_storage,
+            'azure_data_lake': azure_data_lake,
             'celery': celery,
             'cgroups': cgroups,
             'cloudant': cloudant,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c233179/tests/contrib/hooks/test_azure_data_lake_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_azure_data_lake_hook.py b/tests/contrib/hooks/test_azure_data_lake_hook.py
new file mode 100644
index 0000000..d99581b
--- /dev/null
+++ b/tests/contrib/hooks/test_azure_data_lake_hook.py
@@ -0,0 +1,105 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+import json
+import unittest
+
+from airflow import configuration
+from airflow import models
+from airflow.utils import db
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class TestAzureDataLakeHook(unittest.TestCase):
+
+    def setUp(self):
+        configuration.load_test_config()
+        db.merge_conn(
+            models.Connection(
+                conn_id='adl_test_key',
+                conn_type='azure_data_lake',
+                login='client_id',
+                password='client secret',
+                extra=json.dumps({"tenant": "tenant",
+                                  "account_name": "accountname"})
+            )
+        )
+
+    @mock.patch('airflow.contrib.hooks.azure_data_lake_hook.lib', autospec=True)
+    def test_conn(self, mock_lib):
+        from airflow.contrib.hooks.azure_data_lake_hook import AzureDataLakeHook
+        from azure.datalake.store import core
+        hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
+        self.assertEqual(hook.conn_id, 'adl_test_key')
+        self.assertIsInstance(hook.connection, core.AzureDLFileSystem)
+        assert mock_lib.auth.called
+
+    @mock.patch('airflow.contrib.hooks.azure_data_lake_hook.core.AzureDLFileSystem',
+                autospec=True)
+    @mock.patch('airflow.contrib.hooks.azure_data_lake_hook.lib', autospec=True)
+    def test_check_for_blob(self, mock_lib, mock_filesystem):
+        from airflow.contrib.hooks.azure_data_lake_hook import AzureDataLakeHook
+        hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
+        hook.check_for_file('file_path')
+        mock_filesystem.glob.called
+
+    @mock.patch('airflow.contrib.hooks.azure_data_lake_hook.multithread.ADLUploader',
+                autospec=True)
+    @mock.patch('airflow.contrib.hooks.azure_data_lake_hook.lib', autospec=True)
+    def test_upload_file(self, mock_lib, mock_uploader):
+        from airflow.contrib.hooks.azure_data_lake_hook import AzureDataLakeHook
+        hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
+        hook.upload_file(local_path='tests/hooks/test_adl_hook.py',
+                         remote_path='/test_adl_hook.py',
+                         nthreads=64, overwrite=True,
+                         buffersize=4194304, blocksize=4194304)
+        mock_uploader.assert_called_once_with(hook.connection,
+                                              lpath='tests/hooks/test_adl_hook.py',
+                                              rpath='/test_adl_hook.py',
+                                              nthreads=64, overwrite=True,
+                                              buffersize=4194304, blocksize=4194304)
+
+    @mock.patch('airflow.contrib.hooks.azure_data_lake_hook.multithread.ADLDownloader',
+                autospec=True)
+    @mock.patch('airflow.contrib.hooks.azure_data_lake_hook.lib', autospec=True)
+    def test_download_file(self, mock_lib, mock_downloader):
+        from airflow.contrib.hooks.azure_data_lake_hook import AzureDataLakeHook
+        hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
+        hook.download_file(local_path='test_adl_hook.py',
+                           remote_path='/test_adl_hook.py',
+                           nthreads=64, overwrite=True,
+                           buffersize=4194304, blocksize=4194304)
+        mock_downloader.assert_called_once_with(hook.connection,
+                                                lpath='test_adl_hook.py',
+                                                rpath='/test_adl_hook.py',
+                                                nthreads=64, overwrite=True,
+                                                buffersize=4194304, blocksize=4194304)
+
+
+if __name__ == '__main__':
+    unittest.main()


Mime
View raw message