Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 774C2200CF6 for ; Mon, 18 Sep 2017 19:48:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 75EF91609DE; Mon, 18 Sep 2017 17:48:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BB24D1609D4 for ; Mon, 18 Sep 2017 19:48:10 +0200 (CEST) Received: (qmail 25162 invoked by uid 500); 18 Sep 2017 17:48:10 -0000 Mailing-List: contact commits-help@superset.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@superset.incubator.apache.org Delivered-To: mailing list commits@superset.incubator.apache.org Received: (qmail 25153 invoked by uid 99); 18 Sep 2017 17:48:09 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Sep 2017 17:48:09 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 5A2DC84F87; Mon, 18 Sep 2017 17:48:07 +0000 (UTC) Date: Mon, 18 Sep 2017 17:48:07 +0000 To: "commits@superset.apache.org" Subject: [incubator-superset] branch master updated: Feature/Fix: Get a full times_series for your filter instead of Topn for each point in time (#3434) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <150575688780.17588.10534948343443408986@gitbox.apache.org> From: maximebeauchemin@apache.org Reply-To: "commits@superset.apache.org" X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-superset X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 6fe93e18c79eeab470f0297014ad9453552e31aa X-Git-Newrev: c3c9ceb1cc3fc70670257f8f652f083cde2ea14d X-Git-Rev: c3c9ceb1cc3fc70670257f8f652f083cde2ea14d X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated archived-at: Mon, 18 Sep 2017 17:48:11 -0000 This is an automated email from the ASF dual-hosted git repository. maximebeauchemin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-superset.git The following commit(s) were added to refs/heads/master by this push: new c3c9ceb Feature/Fix: Get a full times_series for your filter instead of Topn for each point in time (#3434) c3c9ceb is described below commit c3c9ceb1cc3fc70670257f8f652f083cde2ea14d Author: fabianmenges AuthorDate: Mon Sep 18 13:48:05 2017 -0400 Feature/Fix: Get a full times_series for your filter instead of Topn for each point in time (#3434) --- superset/connectors/druid/models.py | 86 +++++++++++++++++++++++++------------ 1 file changed, 59 insertions(+), 27 deletions(-) diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py index 4aadf06..cc40b83 100644 --- a/superset/connectors/druid/models.py +++ b/superset/connectors/druid/models.py @@ -1,4 +1,4 @@ -# pylint: disable=invalid-unary-operand-type + # pylint: disable=invalid-unary-operand-type from collections import OrderedDict import json import logging @@ -798,6 +798,28 @@ class DruidDatasource(Model, BaseDatasource): def get_query_str(self, query_obj, phase=1, client=None): return self.run_query(client=client, phase=phase, **query_obj) + def _add_filter_from_pre_query_data(self, df, dimensions, dim_filter): + ret = dim_filter + if df is not None and not df.empty: + new_filters = [] + for unused, row in df.iterrows(): + fields = [] + for dim in dimensions: + f = Dimension(dim) == row[dim] + fields.append(f) + if len(fields) > 1: + term = Filter(type="and", fields=fields) + new_filters.append(term) + elif fields: + new_filters.append(fields[0]) + if new_filters: + ff = Filter(type="or", fields=new_filters) + if not dim_filter: + ret = ff + else: + ret = Filter(type="and", fields=[ff, dim_filter]) + return ret + def run_query( # noqa / druid self, groupby, metrics, @@ -834,7 +856,9 @@ class DruidDatasource(Model, BaseDatasource): columns_dict = {c.column_name: c for c in self.columns} - all_metrics, post_aggs = self._metrics_and_post_aggs(metrics, metrics_dict) + all_metrics, post_aggs = self._metrics_and_post_aggs( + metrics, + metrics_dict) aggregations = OrderedDict() for m in self.metrics: @@ -884,15 +908,41 @@ class DruidDatasource(Model, BaseDatasource): if having_filters: qry['having'] = having_filters order_direction = "descending" if order_desc else "ascending" - orig_filters = filters if len(groupby) == 0 and not having_filters: del qry['dimensions'] client.timeseries(**qry) if not having_filters and len(groupby) == 1 and order_desc: + dim = list(qry.get('dimensions'))[0] + if timeseries_limit_metric: + order_by = timeseries_limit_metric + else: + order_by = list(qry['aggregations'].keys())[0] + # Limit on the number of timeseries, doing a two-phases query + pre_qry = deepcopy(qry) + pre_qry['granularity'] = "all" + pre_qry['threshold'] = min(row_limit, + timeseries_limit or row_limit) + pre_qry['metric'] = order_by + pre_qry['dimension'] = dim + del pre_qry['dimensions'] + client.topn(**pre_qry) + query_str += "// Two phase query\n// Phase 1\n" + query_str += json.dumps( + client.query_builder.last_query.query_dict, indent=2) + query_str += "\n" + if phase == 1: + return query_str + query_str += ( + "//\nPhase 2 (built based on phase one's results)\n") + df = client.export_pandas() + qry['filter'] = self._add_filter_from_pre_query_data( + df, + qry['dimensions'], filters) qry['threshold'] = timeseries_limit or 1000 if row_limit and granularity == 'all': qry['threshold'] = row_limit qry['dimension'] = list(qry.get('dimensions'))[0] + qry['dimension'] = dim del qry['dimensions'] qry['metric'] = list(qry['aggregations'].keys())[0] client.topn(**qry) @@ -908,7 +958,7 @@ class DruidDatasource(Model, BaseDatasource): pre_qry['granularity'] = "all" pre_qry['limit_spec'] = { "type": "default", - "limit": timeseries_limit, + "limit": min(timeseries_limit, row_limit), 'intervals': ( inner_from_dttm.isoformat() + '/' + inner_to_dttm.isoformat()), @@ -927,29 +977,10 @@ class DruidDatasource(Model, BaseDatasource): query_str += ( "//\nPhase 2 (built based on phase one's results)\n") df = client.export_pandas() - if df is not None and not df.empty: - dims = qry['dimensions'] - filters = [] - for unused, row in df.iterrows(): - fields = [] - for dim in dims: - f = Dimension(dim) == row[dim] - fields.append(f) - if len(fields) > 1: - filt = Filter(type="and", fields=fields) - filters.append(filt) - elif fields: - filters.append(fields[0]) - - if filters: - ff = Filter(type="or", fields=filters) - if not orig_filters: - qry['filter'] = ff - else: - qry['filter'] = Filter(type="and", fields=[ - ff, - orig_filters]) - qry['limit_spec'] = None + qry['filter'] = self._add_filter_from_pre_query_data( + df, + qry['dimensions'], filters) + qry['limit_spec'] = None if row_limit: qry['limit_spec'] = { "type": "default", @@ -1111,5 +1142,6 @@ class DruidDatasource(Model, BaseDatasource): .all() ) + sa.event.listen(DruidDatasource, 'after_insert', set_perm) sa.event.listen(DruidDatasource, 'after_update', set_perm) -- To stop receiving notification emails like this one, please contact ['"commits@superset.apache.org" '].