airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-514] hive hook loads data from pandas DataFrame into hive and infers types
Date Thu, 17 Nov 2016 19:02:30 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master e1bc51691 -> 15a81e653


[AIRFLOW-514] hive hook loads data from pandas DataFrame into hive and infers types

Closes #1801 from danfrankj/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15a81e65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15a81e65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15a81e65

Branch: refs/heads/master
Commit: 15a81e6535613961f07e5c138b3edbbea3d21ad8
Parents: e1bc516
Author: Dan Frank <dan.frank@airbnb.com>
Authored: Thu Nov 17 11:02:22 2016 -0800
Committer: Chris Riccomini <chrisr@wepay.com>
Committed: Thu Nov 17 11:02:22 2016 -0800

----------------------------------------------------------------------
 airflow/hooks/hive_hooks.py | 64 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 64 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15a81e65/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 0bada7a..d7803b8 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -269,6 +269,68 @@ class HiveCliHook(BaseHook):
                 else:
                     logging.info("SUCCESS")
 
+    def load_df(
+            self,
+            df,
+            table,
+            create=True,
+            recreate=False,
+            field_dict=None,
+            delimiter=',',
+            encoding='utf8',
+            pandas_kwargs={}, **kwargs):
+        """
+        Loads a pandas DataFrame into hive.
+
+        Hive data types will be inferred if not passed but column names will
+        not be sanitized.
+
+        :param table: target Hive table, use dot notation to target a
+            specific database
+        :type table: str
+        :param create: whether to create the table if it doesn't exist
+        :type create: bool
+        :param recreate: whether to drop and recreate the table at every
+            execution
+        :type recreate: bool
+        :param field_dict: mapping from column name to hive data type
+        :type field_dict: dict
+        :param encoding: string encoding to use when writing DataFrame to file
+        :type encoding: str
+        :param pandas_kwargs: passed to DataFrame.to_csv
+        :type pandas_kwargs: dict
+        :param kwargs: passed to self.load_file
+        """
+
+        def _infer_field_types_from_df(df):
+            DTYPE_KIND_HIVE_TYPE = {
+                'b': 'BOOLEAN',  # boolean
+                'i': 'BIGINT',   # signed integer
+                'u': 'BIGINT',   # unsigned integer
+                'f': 'DOUBLE',   # floating-point
+                'c': 'STRING',   # complex floating-point
+                'O': 'STRING',   # object
+                'S': 'STRING',   # (byte-)string
+                'U': 'STRING',   # Unicode
+                'V': 'STRING'    # void
+            }
+
+            return dict((col, DTYPE_KIND_HIVE_TYPE[dtype.kind]) for col, dtype in df.dtypes.iteritems())
+
+        with TemporaryDirectory(prefix='airflow_hiveop_') as tmp_dir:
+            with NamedTemporaryFile(dir=tmp_dir) as f:
+
+                if field_dict is None and (create or recreate):
+                    field_dict = _infer_field_types_from_df(df)
+
+                df.to_csv(f, sep=delimiter, **pandas_kwargs)
+
+                return self.load_file(filepath=f.name,
+                                      table=table,
+                                      delimiter=delimiter,
+                                      field_dict=field_dict,
+                                      **kwargs)
+
     def load_file(
             self,
             filepath,
@@ -307,6 +369,8 @@ class HiveCliHook(BaseHook):
         if recreate:
             hql += "DROP TABLE IF EXISTS {table};\n"
         if create or recreate:
+            if field_dict is None:
+                raise ValueError("Must provide a field dict when creating a table")
             fields = ",\n    ".join(
                 [k + ' ' + v for k, v in field_dict.items()])
             hql += "CREATE TABLE IF NOT EXISTS {table} (\n{fields})\n"


Mime
View raw message