superset-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maximebeauche...@apache.org
Subject [incubator-superset] branch master updated: Druid support via SQLAlchemy (#4163)
Date Fri, 05 Jan 2018 21:53:01 GMT
This is an automated email from the ASF dual-hosted git repository.

maximebeauchemin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git


The following commit(s) were added to refs/heads/master by this push:
     new 686023c  Druid support via SQLAlchemy (#4163)
686023c is described below

commit 686023c8ddd58d4d562128c794d48524bc635217
Author: Beto Dealmeida <roberto@dealmeida.net>
AuthorDate: Fri Jan 5 13:52:58 2018 -0800

    Druid support via SQLAlchemy (#4163)
    
    * Use druiddb
    
    * Remove auto formatting
    
    * Show prequeries
    
    * Fix subtle bug with lists
    
    * Move arguments to query object
    
    * Fix druid run_query
---
 superset/connectors/druid/models.py |   5 +-
 superset/connectors/sqla/models.py  | 105 ++++++++++++++++++++++++++----------
 superset/db_engine_specs.py         |   2 +
 superset/views/core.py              |   6 +++
 superset/viz.py                     |   2 +
 5 files changed, 90 insertions(+), 30 deletions(-)

diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py
index 45d5d25..4689ef2 100644
--- a/superset/connectors/druid/models.py
+++ b/superset/connectors/druid/models.py
@@ -1022,7 +1022,10 @@ class DruidDatasource(Model, BaseDatasource):
             orderby=None,
             extras=None,  # noqa
             columns=None, phase=2, client=None, form_data=None,
-            order_desc=True):
+            order_desc=True,
+            prequeries=None,
+            is_prequery=False,
+        ):
         """Runs a query against Druid and returns a dataframe.
         """
         # TODO refactor into using a TBD Query object
diff --git a/superset/connectors/sqla/models.py b/superset/connectors/sqla/models.py
index e13a8df..889aea3 100644
--- a/superset/connectors/sqla/models.py
+++ b/superset/connectors/sqla/models.py
@@ -370,6 +370,8 @@ class SqlaTable(Model, BaseDatasource):
         )
         logging.info(sql)
         sql = sqlparse.format(sql, reindent=True)
+        if query_obj['is_prequery']:
+            query_obj['prequeries'].append(sql)
         return sql
 
     def get_sqla_table(self):
@@ -405,7 +407,10 @@ class SqlaTable(Model, BaseDatasource):
             extras=None,
             columns=None,
             form_data=None,
-            order_desc=True):
+            order_desc=True,
+            prequeries=None,
+            is_prequery=False,
+        ):
         """Querying any sqla table from this common interface"""
         template_kwargs = {
             'from_dttm': from_dttm,
@@ -564,37 +569,73 @@ class SqlaTable(Model, BaseDatasource):
 
         if is_timeseries and \
                 timeseries_limit and groupby and not time_groupby_inline:
-            # some sql dialects require for order by expressions
-            # to also be in the select clause -- others, e.g. vertica,
-            # require a unique inner alias
-            inner_main_metric_expr = main_metric_expr.label('mme_inner__')
-            inner_select_exprs += [inner_main_metric_expr]
-            subq = select(inner_select_exprs)
-            subq = subq.select_from(tbl)
-            inner_time_filter = dttm_col.get_time_filter(
-                inner_from_dttm or from_dttm,
-                inner_to_dttm or to_dttm,
-            )
-            subq = subq.where(and_(*(where_clause_and + [inner_time_filter])))
-            subq = subq.group_by(*inner_groupby_exprs)
-
-            ob = inner_main_metric_expr
-            if timeseries_limit_metric:
-                timeseries_limit_metric = metrics_dict.get(timeseries_limit_metric)
-                ob = timeseries_limit_metric.sqla_col
-            direction = desc if order_desc else asc
-            subq = subq.order_by(direction(ob))
-            subq = subq.limit(timeseries_limit)
-
-            on_clause = []
-            for i, gb in enumerate(groupby):
-                on_clause.append(
-                    groupby_exprs[i] == column(gb + '__'))
-
-            tbl = tbl.join(subq.alias(), and_(*on_clause))
+            if self.database.db_engine_spec.inner_joins:
+                # some sql dialects require for order by expressions
+                # to also be in the select clause -- others, e.g. vertica,
+                # require a unique inner alias
+                inner_main_metric_expr = main_metric_expr.label('mme_inner__')
+                inner_select_exprs += [inner_main_metric_expr]
+                subq = select(inner_select_exprs)
+                subq = subq.select_from(tbl)
+                inner_time_filter = dttm_col.get_time_filter(
+                    inner_from_dttm or from_dttm,
+                    inner_to_dttm or to_dttm,
+                )
+                subq = subq.where(and_(*(where_clause_and + [inner_time_filter])))
+                subq = subq.group_by(*inner_groupby_exprs)
+
+                ob = inner_main_metric_expr
+                if timeseries_limit_metric:
+                    timeseries_limit_metric = metrics_dict.get(timeseries_limit_metric)
+                    ob = timeseries_limit_metric.sqla_col
+                direction = desc if order_desc else asc
+                subq = subq.order_by(direction(ob))
+                subq = subq.limit(timeseries_limit)
+
+                on_clause = []
+                for i, gb in enumerate(groupby):
+                    on_clause.append(
+                        groupby_exprs[i] == column(gb + '__'))
+
+                tbl = tbl.join(subq.alias(), and_(*on_clause))
+            else:
+                # run subquery to get top groups
+                subquery_obj = {
+                    'prequeries': prequeries,
+                    'is_prequery': True,
+                    'is_timeseries': False,
+                    'row_limit': timeseries_limit,
+                    'groupby': groupby,
+                    'metrics': metrics,
+                    'granularity': granularity,
+                    'from_dttm': inner_from_dttm or from_dttm,
+                    'to_dttm': inner_to_dttm or to_dttm,
+                    'filter': filter,
+                    'orderby': orderby,
+                    'extras': extras,
+                    'columns': columns,
+                    'form_data': form_data,
+                    'order_desc': True,
+                }
+                result = self.query(subquery_obj)
+                dimensions = [c for c in result.df.columns if c not in metrics]
+                top_groups = self._get_top_groups(result.df, dimensions)
+                qry = qry.where(top_groups)
 
         return qry.select_from(tbl)
 
+    def _get_top_groups(self, df, dimensions):
+        cols = {col.column_name: col for col in self.columns}
+        groups = []
+        for unused, row in df.iterrows():
+            group = []
+            for dimension in dimensions:
+                col_obj = cols.get(dimension)
+                group.append(col_obj.sqla_col == row[dimension])
+            groups.append(and_(*group))
+
+        return or_(*groups)
+
     def query(self, query_obj):
         qry_start_dttm = datetime.now()
         sql = self.get_query_str(query_obj)
@@ -609,6 +650,12 @@ class SqlaTable(Model, BaseDatasource):
             error_message = (
                 self.database.db_engine_spec.extract_error_message(e))
 
+        # if this is a main query with prequeries, combine them together
+        if not query_obj['is_prequery']:
+            query_obj['prequeries'].append(sql)
+            sql = ';\n\n'.join(query_obj['prequeries'])
+        sql += ';'
+
         return QueryResult(
             status=status,
             df=df,
diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py
index e02d477..19f541d 100644
--- a/superset/db_engine_specs.py
+++ b/superset/db_engine_specs.py
@@ -62,6 +62,7 @@ class BaseEngineSpec(object):
     time_groupby_inline = False
     limit_method = LimitMethod.FETCH_MANY
     time_secondary_columns = False
+    inner_joins = True
 
     @classmethod
     def fetch_data(cls, cursor, limit):
@@ -1229,6 +1230,7 @@ class DruidEngineSpec(BaseEngineSpec):
     """Engine spec for Druid.io"""
     engine = 'druid'
     limit_method = LimitMethod.FETCH_MANY
+    inner_joins = False
 
 
 engines = {
diff --git a/superset/views/core.py b/superset/views/core.py
index 2f0c0e5..1f5cfd0 100755
--- a/superset/views/core.py
+++ b/superset/views/core.py
@@ -991,6 +991,12 @@ class Superset(BaseSupersetView):
             query = viz_obj.datasource.get_query_str(query_obj)
         except Exception as e:
             return json_error_response(e)
+
+        if query_obj['prequeries']:
+            query_obj['prequeries'].append(query)
+            query = ';\n\n'.join(query_obj['prequeries'])
+        query += ';'
+
         return Response(
             json.dumps({
                 'query': query,
diff --git a/superset/viz.py b/superset/viz.py
index e649456..a97b04e 100644
--- a/superset/viz.py
+++ b/superset/viz.py
@@ -205,6 +205,8 @@ class BaseViz(object):
             'timeseries_limit_metric': timeseries_limit_metric,
             'form_data': form_data,
             'order_desc': order_desc,
+            'prequeries': [],
+            'is_prequery': False,
         }
         return d
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@superset.apache.org" <commits@superset.apache.org>'].

Mime
View raw message