airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] Fokko commented on a change in pull request #3560: [AIRFLOW-2697] Drop snakebite in favour of hdfs3
Date Fri, 12 Oct 2018 09:33:47 GMT
Fokko commented on a change in pull request #3560: [AIRFLOW-2697] Drop snakebite in favour
of hdfs3
URL: https://github.com/apache/incubator-airflow/pull/3560#discussion_r224726843
 
 

 ##########
 File path: airflow/hooks/hdfs_hook.py
 ##########
 @@ -17,85 +17,95 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from six import PY2
+import warnings
+
+import hdfs3
+from hdfs3.utils import MyNone
 
-from airflow import configuration
-from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 
 
-snakebite_imported = False
-if PY2:
-    from snakebite.client import Client, HAClient, Namenode, AutoConfigClient
-    snakebite_imported = True
+class HdfsHook(BaseHook):
+    """Hook for interacting with HDFS using the hdfs3 library.
 
+    By default hdfs3 loads its configuration from `core-site.xml` and
+    `hdfs-site.xml` if these files can be found in any of the typical
+    locations. The hook loads `host` and `port` parameters from the
+    hdfs connection (if given) and extra configuration parameters can be
+    supplied using the `pars` key in extra JSON. See the hdfs3 documentation
+    for more details.
 
-class HDFSHookException(AirflowException):
-    pass
+    :param str hdfs_conn_id: Connection ID to fetch parameters from.
+    :param bool autoconf: Whether to use autoconfig to discover
+        configuration options from the hdfs XML configuration files.
+    """
 
+    def __init__(self, hdfs_conn_id=None, autoconf=True):
+        super().__init__(None)
 
-class HDFSHook(BaseHook):
-    """
-    Interact with HDFS. This class is a wrapper around the snakebite library.
-
-    :param hdfs_conn_id: Connection id to fetch connection info
-    :type conn_id: string
-    :param proxy_user: effective user for HDFS operations
-    :type proxy_user: string
-    :param autoconfig: use snakebite's automatically configured client
-    :type autoconfig: bool
-    """
-    def __init__(self, hdfs_conn_id='hdfs_default', proxy_user=None,
-                 autoconfig=False):
-        if not snakebite_imported:
-            raise ImportError(
-                'This HDFSHook implementation requires snakebite, but '
-                'snakebite is not compatible with Python 3 '
-                '(as of August 2015). Please use Python 2 if you require '
-                'this hook  -- or help by submitting a PR!')
         self.hdfs_conn_id = hdfs_conn_id
-        self.proxy_user = proxy_user
-        self.autoconfig = autoconfig
+        self._autoconf = autoconf
+
+        self._conn = None
 
     def get_conn(self):
-        """
-        Returns a snakebite HDFSClient object.
-        """
-        # When using HAClient, proxy_user must be the same, so is ok to always
-        # take the first.
-        effective_user = self.proxy_user
-        autoconfig = self.autoconfig
-        use_sasl = configuration.conf.get('core', 'security') == 'kerberos'
-
-        try:
-            connections = self.get_connections(self.hdfs_conn_id)
-
-            if not effective_user:
-                effective_user = connections[0].login
-            if not autoconfig:
-                autoconfig = connections[0].extra_dejson.get('autoconfig',
-                                                             False)
-            hdfs_namenode_principal = connections[0].extra_dejson.get(
-                'hdfs_namenode_principal')
-        except AirflowException:
-            if not autoconfig:
-                raise
-
-        if autoconfig:
-            # will read config info from $HADOOP_HOME conf files
-            client = AutoConfigClient(effective_user=effective_user,
-                                      use_sasl=use_sasl)
-        elif len(connections) == 1:
-            client = Client(connections[0].host, connections[0].port,
-                            effective_user=effective_user, use_sasl=use_sasl,
-                            hdfs_namenode_principal=hdfs_namenode_principal)
-        elif len(connections) > 1:
-            nn = [Namenode(conn.host, conn.port) for conn in connections]
-            client = HAClient(nn, effective_user=effective_user,
 
 Review comment:
   Also known as the poor mans load balancing. This will not really work since there is no
fallback, but it will pick a random connection, and then you need to pray that one is up.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message