superset-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maximebeauche...@apache.org
Subject [incubator-superset] branch master updated: Feature/Fix: Get a full times_series for your filter instead of Topn for each point in time (#3434)
Date Mon, 18 Sep 2017 17:48:07 GMT
This is an automated email from the ASF dual-hosted git repository.

maximebeauchemin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git


The following commit(s) were added to refs/heads/master by this push:
     new c3c9ceb  Feature/Fix: Get a full times_series for your filter instead of Topn for
each point in time (#3434)
c3c9ceb is described below

commit c3c9ceb1cc3fc70670257f8f652f083cde2ea14d
Author: fabianmenges <fabianmenges@users.noreply.github.com>
AuthorDate: Mon Sep 18 13:48:05 2017 -0400

    Feature/Fix: Get a full times_series for your filter instead of Topn for each point in
time (#3434)
---
 superset/connectors/druid/models.py | 86 +++++++++++++++++++++++++------------
 1 file changed, 59 insertions(+), 27 deletions(-)

diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py
index 4aadf06..cc40b83 100644
--- a/superset/connectors/druid/models.py
+++ b/superset/connectors/druid/models.py
@@ -1,4 +1,4 @@
-# pylint: disable=invalid-unary-operand-type
+ # pylint: disable=invalid-unary-operand-type
 from collections import OrderedDict
 import json
 import logging
@@ -798,6 +798,28 @@ class DruidDatasource(Model, BaseDatasource):
     def get_query_str(self, query_obj, phase=1, client=None):
         return self.run_query(client=client, phase=phase, **query_obj)
 
+    def _add_filter_from_pre_query_data(self, df, dimensions, dim_filter):
+        ret = dim_filter
+        if df is not None and not df.empty:
+            new_filters = []
+            for unused, row in df.iterrows():
+                fields = []
+                for dim in dimensions:
+                    f = Dimension(dim) == row[dim]
+                    fields.append(f)
+                if len(fields) > 1:
+                    term = Filter(type="and", fields=fields)
+                    new_filters.append(term)
+                elif fields:
+                    new_filters.append(fields[0])
+            if new_filters:
+                ff = Filter(type="or", fields=new_filters)
+                if not dim_filter:
+                    ret = ff
+                else:
+                    ret = Filter(type="and", fields=[ff, dim_filter])
+        return ret
+
     def run_query(  # noqa / druid
             self,
             groupby, metrics,
@@ -834,7 +856,9 @@ class DruidDatasource(Model, BaseDatasource):
 
         columns_dict = {c.column_name: c for c in self.columns}
 
-        all_metrics, post_aggs = self._metrics_and_post_aggs(metrics, metrics_dict)
+        all_metrics, post_aggs = self._metrics_and_post_aggs(
+                                      metrics,
+                                      metrics_dict)
 
         aggregations = OrderedDict()
         for m in self.metrics:
@@ -884,15 +908,41 @@ class DruidDatasource(Model, BaseDatasource):
         if having_filters:
             qry['having'] = having_filters
         order_direction = "descending" if order_desc else "ascending"
-        orig_filters = filters
         if len(groupby) == 0 and not having_filters:
             del qry['dimensions']
             client.timeseries(**qry)
         if not having_filters and len(groupby) == 1 and order_desc:
+            dim = list(qry.get('dimensions'))[0]
+            if timeseries_limit_metric:
+                order_by = timeseries_limit_metric
+            else:
+                order_by = list(qry['aggregations'].keys())[0]
+            # Limit on the number of timeseries, doing a two-phases query
+            pre_qry = deepcopy(qry)
+            pre_qry['granularity'] = "all"
+            pre_qry['threshold'] = min(row_limit,
+                                       timeseries_limit or row_limit)
+            pre_qry['metric'] = order_by
+            pre_qry['dimension'] = dim
+            del pre_qry['dimensions']
+            client.topn(**pre_qry)
+            query_str += "// Two phase query\n// Phase 1\n"
+            query_str += json.dumps(
+              client.query_builder.last_query.query_dict, indent=2)
+            query_str += "\n"
+            if phase == 1:
+                return query_str
+            query_str += (
+              "//\nPhase 2 (built based on phase one's results)\n")
+            df = client.export_pandas()
+            qry['filter'] = self._add_filter_from_pre_query_data(
+                                df,
+                                qry['dimensions'], filters)
             qry['threshold'] = timeseries_limit or 1000
             if row_limit and granularity == 'all':
                 qry['threshold'] = row_limit
             qry['dimension'] = list(qry.get('dimensions'))[0]
+            qry['dimension'] = dim
             del qry['dimensions']
             qry['metric'] = list(qry['aggregations'].keys())[0]
             client.topn(**qry)
@@ -908,7 +958,7 @@ class DruidDatasource(Model, BaseDatasource):
                 pre_qry['granularity'] = "all"
                 pre_qry['limit_spec'] = {
                     "type": "default",
-                    "limit": timeseries_limit,
+                    "limit": min(timeseries_limit, row_limit),
                     'intervals': (
                         inner_from_dttm.isoformat() + '/' +
                         inner_to_dttm.isoformat()),
@@ -927,29 +977,10 @@ class DruidDatasource(Model, BaseDatasource):
                 query_str += (
                     "//\nPhase 2 (built based on phase one's results)\n")
                 df = client.export_pandas()
-                if df is not None and not df.empty:
-                    dims = qry['dimensions']
-                    filters = []
-                    for unused, row in df.iterrows():
-                        fields = []
-                        for dim in dims:
-                            f = Dimension(dim) == row[dim]
-                            fields.append(f)
-                        if len(fields) > 1:
-                            filt = Filter(type="and", fields=fields)
-                            filters.append(filt)
-                        elif fields:
-                            filters.append(fields[0])
-
-                    if filters:
-                        ff = Filter(type="or", fields=filters)
-                        if not orig_filters:
-                            qry['filter'] = ff
-                        else:
-                            qry['filter'] = Filter(type="and", fields=[
-                                ff,
-                                orig_filters])
-                    qry['limit_spec'] = None
+                qry['filter'] = self._add_filter_from_pre_query_data(
+                                    df,
+                                    qry['dimensions'], filters)
+                qry['limit_spec'] = None
             if row_limit:
                 qry['limit_spec'] = {
                     "type": "default",
@@ -1111,5 +1142,6 @@ class DruidDatasource(Model, BaseDatasource):
             .all()
         )
 
+
 sa.event.listen(DruidDatasource, 'after_insert', set_perm)
 sa.event.listen(DruidDatasource, 'after_update', set_perm)

-- 
To stop receiving notification emails like this one, please contact
['"commits@superset.apache.org" <commits@superset.apache.org>'].

Mime
View raw message