superset-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [incubator-superset] branch master updated: Adding support for Pinot (#6719)
Date Wed, 06 Feb 2019 01:04:26 GMT
This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git


The following commit(s) were added to refs/heads/master by this push:
     new ff9506f  Adding support for Pinot (#6719)
ff9506f is described below

commit ff9506fec229a170b0ad7cdacb1fccfd8eff6ef3
Author: agrawaldevesh <dagrawal@uber.com>
AuthorDate: Tue Feb 5 17:04:19 2019 -0800

    Adding support for Pinot (#6719)
    
    Summary: Added limited support for visualizations with Pinot via
    Sqlalchemy.
    
    Pinot QL (PQL) is a bit weird and limited, and this patch hacks superset to
    deal with that weirdness:
    
    1. Pinot's grouping by time is best done as a long epoch. Grouping by a
    time string is really slow and times out.
    
    2. Pinot's response does not respect column aliases. So columns are not
    named what they are expected to. So we remember the given column aliases
    and then stamp them back onto the dataframe
    
    3. Pinot's Json rest call does not return the output types. Instead
    everything is cast to string. So when grouping by time, the group key
    is integral and has to be treated specially when casting back to the
    dataframe __timestamp column.
    
    4. Finally, pinot does support grouping by on expressions. But those
    expressions cannot then appear on the select clause. They are returned
    regardless in the response. ie, 'select foo, count(*) from bar group by
    foo' is okay, but 'select expr(foo), count(*) from bar group by
    expr(foo)' ain't. One must use 'select count(*) from bar group by
    expr(foo)'.
    
    I also fixed a couple of things that looked like bugs to me: for
    example, the row-ordering-limit should come at the end always.
    
    Test Plan: Tested with the modified pinotdb sqlalchemy driver and an
    internal pinot cluster. The pinotdb driver changes are in
    https://github.com/agrawaldevesh/pinot-dbapi.
    
    Pinot does not support orderby-limit for aggregated queries. To annotate
    a query as an aggregate query, this patch adds a hint to the prepared
    select statement that the pinotdb sqlalchemy driver then heeds.
---
 superset/connectors/sqla/models.py | 63 +++++++++++++++++-------------
 superset/db_engine_specs.py        | 80 ++++++++++++++++++++++++++++++++++++++
 superset/viz.py                    | 19 ++++++++-
 3 files changed, 135 insertions(+), 27 deletions(-)

diff --git a/superset/connectors/sqla/models.py b/superset/connectors/sqla/models.py
index 64b38cf..86d9273 100644
--- a/superset/connectors/sqla/models.py
+++ b/superset/connectors/sqla/models.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 # pylint: disable=C,R,W
-from collections import OrderedDict
+from collections import namedtuple, OrderedDict
 from datetime import datetime
 import logging
 
@@ -46,6 +46,9 @@ from superset.utils import core as utils, import_datasource
 config = app.config
 metadata = Model.metadata  # pylint: disable=no-member
 
+SqlaQuery = namedtuple('SqlaQuery', ['sqla_query', 'labels_expected'])
+QueryStringExtended = namedtuple('QueryStringExtended', ['sql', 'labels_expected'])
+
 
 class AnnotationDatasource(BaseDatasource):
     """ Dummy object so we can query annotations using 'Viz' objects just like
@@ -141,23 +144,20 @@ class TableColumn(Model, BaseColumn):
         """Getting the time component of the query"""
         label = self.table.get_label(utils.DTTM_ALIAS)
 
+        db = self.table.database
         pdf = self.python_date_format
         is_epoch = pdf in ('epoch_s', 'epoch_ms')
         if not self.expression and not time_grain and not is_epoch:
             return column(self.column_name, type_=DateTime).label(label)
-
-        expr = self.expression or self.column_name
-        if is_epoch:
-            # if epoch, translate to DATE using db specific conf
-            db_spec = self.table.database.db_engine_spec
-            if pdf == 'epoch_s':
-                expr = db_spec.epoch_to_dttm().format(col=expr)
-            elif pdf == 'epoch_ms':
-                expr = db_spec.epoch_ms_to_dttm().format(col=expr)
+        grain = None
         if time_grain:
-            grain = self.table.database.grains_dict().get(time_grain)
-            if grain:
-                expr = grain.function.format(col=expr)
+            grain = db.grains_dict().get(time_grain)
+            if not grain:
+                raise NotImplementedError(
+                    f'No grain spec for {time_grain} for database {db.database_name}')
+        expr = db.db_engine_spec.get_time_expr(
+            self.expression or self.column_name,
+            pdf, time_grain, grain)
         return literal_column(expr, type_=DateTime).label(label)
 
     @classmethod
@@ -476,15 +476,18 @@ class SqlaTable(Model, BaseDatasource):
         return get_template_processor(
             table=self, database=self.database, **kwargs)
 
-    def get_query_str(self, query_obj):
-        qry = self.get_sqla_query(**query_obj)
-        sql = self.database.compile_sqla_query(qry)
+    def get_query_str_extended(self, query_obj):
+        sqlaq = self.get_sqla_query(**query_obj)
+        sql = self.database.compile_sqla_query(sqlaq.sqla_query)
         logging.info(sql)
         sql = sqlparse.format(sql, reindent=True)
         if query_obj['is_prequery']:
             query_obj['prequeries'].append(sql)
         sql = self.mutate_query_from_config(sql)
-        return sql
+        return QueryStringExtended(labels_expected=sqlaq.labels_expected, sql=sql)
+
+    def get_query_str(self, query_obj):
+        return self.get_query_str_extended(query_obj).sql
 
     def get_sqla_table(self):
         tbl = table(self.table_name)
@@ -517,12 +520,11 @@ class SqlaTable(Model, BaseDatasource):
 
         if expression_type == utils.ADHOC_METRIC_EXPRESSION_TYPES['SIMPLE']:
             column_name = metric.get('column').get('column_name')
-            sqla_column = column(column_name)
             table_column = cols.get(column_name)
-
             if table_column:
                 sqla_column = table_column.get_sqla_col()
-
+            else:
+                sqla_column = column(column_name)
             sqla_metric = self.sqla_aggregations[metric.get('aggregate')](sqla_column)
             sqla_metric = sqla_metric.label(label)
             return sqla_metric
@@ -551,7 +553,7 @@ class SqlaTable(Model, BaseDatasource):
             order_desc=True,
             prequeries=None,
             is_prequery=False,
-        ):
+    ):
         """Querying any sqla table from this common interface"""
         template_kwargs = {
             'from_dttm': from_dttm,
@@ -640,6 +642,12 @@ class SqlaTable(Model, BaseDatasource):
             time_filters.append(dttm_col.get_time_filter(from_dttm, to_dttm))
 
         select_exprs += metrics_exprs
+
+        labels_expected = [str(c.name) for c in select_exprs]
+
+        select_exprs = db_engine_spec.make_select_compatible(
+            groupby_exprs_with_timestamp.values(),
+            select_exprs)
         qry = sa.select(select_exprs)
 
         tbl = self.get_from_clause(template_processor)
@@ -793,7 +801,8 @@ class SqlaTable(Model, BaseDatasource):
                                                   groupby_exprs_sans_timestamp)
                 qry = qry.where(top_groups)
 
-        return qry.select_from(tbl)
+        return SqlaQuery(sqla_query=qry.select_from(tbl),
+                         labels_expected=labels_expected)
 
     def _get_top_groups(self, df, dimensions, groupby_exprs):
         groups = []
@@ -807,19 +816,21 @@ class SqlaTable(Model, BaseDatasource):
 
     def query(self, query_obj):
         qry_start_dttm = datetime.now()
-        sql = self.get_query_str(query_obj)
+        query_str_ext = self.get_query_str_extended(query_obj)
+        sql = query_str_ext.sql
         status = utils.QueryStatus.SUCCESS
         error_message = None
         df = None
+        db_engine_spec = self.database.db_engine_spec
         try:
             df = self.database.get_df(sql, self.schema)
             if self.mutated_labels:
                 df = df.rename(index=str, columns=self.mutated_labels)
+            db_engine_spec.mutate_df_columns(df, sql, query_str_ext.labels_expected)
         except Exception as e:
             status = utils.QueryStatus.FAILED
-            logging.exception(e)
-            error_message = (
-                self.database.db_engine_spec.extract_error_message(e))
+            logging.exception(f'Query {sql} on schema {self.schema} failed')
+            error_message = db_engine_spec.extract_error_message(e)
 
         # if this is a main query with prequeries, combine them together
         if not query_obj['is_prequery']:
diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py
index 3ae433c..40d1ea5 100644
--- a/superset/db_engine_specs.py
+++ b/superset/db_engine_specs.py
@@ -115,6 +115,18 @@ class BaseEngineSpec(object):
     arraysize = None
 
     @classmethod
+    def get_time_expr(cls, expr, pdf, time_grain, grain):
+        # if epoch, translate to DATE using db specific conf
+        if pdf == 'epoch_s':
+            expr = cls.epoch_to_dttm().format(col=expr)
+        elif pdf == 'epoch_ms':
+            expr = cls.epoch_ms_to_dttm().format(col=expr)
+
+        if grain:
+            expr = grain.function.format(col=expr)
+        return expr
+
+    @classmethod
     def get_time_grains(cls):
         blacklist = config.get('TIME_GRAIN_BLACKLIST', [])
         grains = builtin_time_grains.copy()
@@ -125,6 +137,16 @@ class BaseEngineSpec(object):
         return _create_time_grains_tuple(grains, grain_functions, blacklist)
 
     @classmethod
+    def make_select_compatible(cls, groupby_exprs, select_exprs):
+        # Some databases will just return the group-by field into the select, but don't
+        # allow the group-by field to be put into the select list.
+        return select_exprs
+
+    @classmethod
+    def mutate_df_columns(cls, df, sql, labels_expected):
+        pass
+
+    @classmethod
     def fetch_data(cls, cursor, limit):
         if cls.arraysize:
             cursor.arraysize = cls.arraysize
@@ -1413,6 +1435,64 @@ class AthenaEngineSpec(BaseEngineSpec):
         return 'from_unixtime({col})'
 
 
+class PinotEngineSpec(BaseEngineSpec):
+    engine = 'pinot'
+    allows_subquery = False
+    inner_joins = False
+
+    _time_grain_to_datetimeconvert = {
+        'PT1S': '1:SECONDS',
+        'PT1M': '1:MINUTES',
+        'PT1H': '1:HOURS',
+        'P1D': '1:DAYS',
+        'P1Y': '1:YEARS',
+        'P1M': '1:MONTHS',
+    }
+
+    # Pinot does its own conversion below
+    time_grain_functions = {k: None for k in _time_grain_to_datetimeconvert.keys()}
+
+    @classmethod
+    def get_time_expr(cls, expr, pdf, time_grain, grain):
+        is_epoch = pdf in ('epoch_s', 'epoch_ms')
+        if not is_epoch:
+            raise NotImplementedError('Pinot currently only supports epochs')
+        # The DATETIMECONVERT pinot udf is documented at
+        # Per https://github.com/apache/incubator-pinot/wiki/dateTimeConvert-UDF
+        # We are not really converting any time units, just bucketing them.
+        seconds_or_ms = 'MILLISECONDS' if pdf == 'epoch_ms' else 'SECONDS'
+        tf = f'1:{seconds_or_ms}:EPOCH'
+        granularity = cls._time_grain_to_datetimeconvert.get(time_grain)
+        if not granularity:
+            raise NotImplementedError('No pinot grain spec for ' + str(time_grain))
+        # In pinot the output is a string since there is no timestamp column like pg
+        return f'DATETIMECONVERT({expr}, "{tf}", "{tf}", "{granularity}")'
+
+    @classmethod
+    def make_select_compatible(cls, groupby_exprs, select_exprs):
+        # Pinot does not want the group by expr's to appear in the select clause
+        select_sans_groupby = []
+        # We want identity and not equality, so doing the filtering manually
+        for s in select_exprs:
+            for gr in groupby_exprs:
+                if s is gr:
+                    break
+            else:
+                select_sans_groupby.append(s)
+        return select_sans_groupby
+
+    @classmethod
+    def mutate_df_columns(cls, df, sql, labels_expected):
+        if df is not None and \
+                not df.empty and \
+                labels_expected is not None:
+            if len(df.columns) != len(labels_expected):
+                raise Exception(f'For {sql}, df.columns: {df.columns}'
+                                f' differs from {labels_expected}')
+            else:
+                df.columns = labels_expected
+
+
 class ClickHouseEngineSpec(BaseEngineSpec):
     """Dialect for ClickHouse analytical DB."""
 
diff --git a/superset/viz.py b/superset/viz.py
index 51fab12..a88860e 100644
--- a/superset/viz.py
+++ b/superset/viz.py
@@ -96,6 +96,8 @@ class BaseViz(object):
         self.time_shift = timedelta()
 
         self.status = None
+        self.error_msg = ''
+        self.results = None
         self.error_message = None
         self.force = force
 
@@ -226,7 +228,22 @@ class BaseViz(object):
             if DTTM_ALIAS in df.columns:
                 if timestamp_format in ('epoch_s', 'epoch_ms'):
                     # Column has already been formatted as a timestamp.
-                    df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(pd.Timestamp)
+                    dttm_col = df[DTTM_ALIAS]
+                    one_ts_val = dttm_col[0]
+
+                    # convert time column to pandas Timestamp, but different
+                    # ways to convert depending on string or int types
+                    try:
+                        int(one_ts_val)
+                        is_integral = True
+                    except ValueError:
+                        is_integral = False
+                    if is_integral:
+                        unit = 's' if timestamp_format == 'epoch_s' else 'ms'
+                        df[DTTM_ALIAS] = pd.to_datetime(dttm_col, utc=False, unit=unit,
+                                                        origin='unix')
+                    else:
+                        df[DTTM_ALIAS] = dttm_col.apply(pd.Timestamp)
                 else:
                     df[DTTM_ALIAS] = pd.to_datetime(
                         df[DTTM_ALIAS], utc=False, format=timestamp_format)


Mime
View raw message