airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1189] Fix get a DataFrame using BigQueryHook failing
Date Fri, 12 May 2017 16:09:02 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3f546e285 -> 93666f996


[AIRFLOW-1189] Fix get a DataFrame using BigQueryHook failing

Closes #2287 from mremes/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/93666f99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/93666f99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/93666f99

Branch: refs/heads/master
Commit: 93666f9965b2940c5179c5b6bbfef8cf278c024c
Parents: 3f546e2
Author: Matti Remes <remes@iki.fi>
Authored: Fri May 12 09:08:28 2017 -0700
Committer: Chris Riccomini <criccomini@apache.org>
Committed: Fri May 12 09:08:33 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py    | 11 +++++++---
 tests/contrib/hooks/test_bigquery_hook.py | 29 ++++++++++++++++++++++++++
 2 files changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/93666f99/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 59ece43..bf18209 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -74,7 +74,7 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook):
         """
         raise NotImplementedError()
 
-    def get_pandas_df(self, bql, parameters=None):
+    def get_pandas_df(self, bql, parameters=None, dialect='legacy'):
         """
         Returns a Pandas DataFrame for the results produced by a BigQuery
         query. The DbApiHook method must be overridden because Pandas
@@ -85,10 +85,14 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook):
 
         :param bql: The BigQuery SQL to execute.
         :type bql: string
+        :param parameters: The parameters to render the SQL query with (not used, leave to
override superclass method)
+        :type parameters: mapping or iterable
+        :param dialect: Dialect of BigQuery SQL – legacy SQL or standard SQL
+        :type dialect: string in {'legacy', 'standard'}, default 'legacy'
         """
         service = self.get_service()
         project = self._get_field('project')
-        connector = BigQueryPandasConnector(project, service)
+        connector = BigQueryPandasConnector(project, service, dialect=dialect)
         schema, pages = connector.run_query(bql)
         dataframe_list = []
 
@@ -136,13 +140,14 @@ class BigQueryPandasConnector(GbqConnector):
     without forcing a three legged OAuth connection. Instead, we can inject
     service account credentials into the binding.
     """
-    def __init__(self, project_id, service, reauth=False, verbose=False):
+    def __init__(self, project_id, service, reauth=False, verbose=False, dialect='legacy'):
         gbq_check_google_client_version()
         gbq_test_google_api_imports()
         self.project_id = project_id
         self.reauth = reauth
         self.service = service
         self.verbose = verbose
+        self.dialect = dialect
 
 
 class BigQueryConnection(object):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/93666f99/tests/contrib/hooks/test_bigquery_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py
index 0adffc5..ab946f0 100644
--- a/tests/contrib/hooks/test_bigquery_hook.py
+++ b/tests/contrib/hooks/test_bigquery_hook.py
@@ -18,6 +18,35 @@ import unittest
 from airflow.contrib.hooks import bigquery_hook as hook
 
 
+class TestBigQueryDataframeResults(unittest.TestCase):
+    def setUp(self):
+        self.instance = hook.BigQueryHook()
+
+    def test_output_is_dataframe_with_valid_query(self):
+        import pandas as pd
+        df = self.instance.get_pandas_df('select 1')
+        self.assertIsInstance(df, pd.DataFrame)
+
+    def test_throws_exception_with_invalid_query(self):
+        with self.assertRaises(Exception) as context:
+            self.instance.get_pandas_df('from `1`')
+        self.assertIn('pandas_gbq.gbq.GenericGBQException: Reason: invalidQuery',
+                      str(context.exception), "")
+
+    def test_suceeds_with_explicit_legacy_query(self):
+        df = self.instance.get_pandas_df('select 1', dialect='legacy')
+        self.assertEqual(df.iloc(0)[0][0], 1)
+
+    def test_suceeds_with_explicit_std_query(self):
+        df = self.instance.get_pandas_df('select * except(b) from (select 1 a, 2 b)', dialect='standard')
+        self.assertEqual(df.iloc(0)[0][0], 1)
+
+    def test_throws_exception_with_incompatible_syntax(self):
+        with self.assertRaises(Exception) as context:
+            self.instance.get_pandas_df('select * except(b) from (select 1 a, 2 b)', dialect='legacy')
+        self.assertIn('pandas_gbq.gbq.GenericGBQException: Reason: invalidQuery',
+                      str(context.exception), "")
+
 class TestBigQueryTableSplitter(unittest.TestCase):
     def test_internal_need_default_project(self):
         with self.assertRaises(Exception) as context:


Mime
View raw message