superset-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From johnbod...@apache.org
Subject [incubator-superset] 01/43: added get_data ;
Date Fri, 11 Jan 2019 02:22:05 GMT
This is an automated email from the ASF dual-hosted git repository.

johnbodley pushed a commit to branch feature--embeddable-charts-pilot
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git

commit 3f400831d680b77f996c194f1311aa560b0706fe
Author: Conglei Shi <conglei.shi@airbnb.com>
AuthorDate: Mon Nov 12 12:34:15 2018 -0800

    added get_data
    ;
---
 superset/common/query_context.py | 198 ++++++++++++++++++++++++++++++++++++++-
 superset/common/query_object.py  |   8 ++
 2 files changed, 203 insertions(+), 3 deletions(-)

diff --git a/superset/common/query_context.py b/superset/common/query_context.py
index 21b0dac..f17f39c 100644
--- a/superset/common/query_context.py
+++ b/superset/common/query_context.py
@@ -1,9 +1,16 @@
 # pylint: disable=R
-from typing import Dict, List
+from typing import Dict, List,
 
 from superset import db
 from superset.connectors.connector_registry import ConnectorRegistry
+from superset import app, cache
 from .query_object import QueryObject
+import pickle as pkl
+
+from superset.utils.core import (
+    DTTM_ALIAS,
+    JS_MAX_INTEGER,
+)
 
 
 class QueryContext:
@@ -11,17 +18,202 @@ class QueryContext:
     The query context contains the query object and additional fields necessary
     to retrieve the data payload for a given viz.
     """
+
+    default_fillna = 0
+    cache_type = 'df'
+    enforce_numerical_metrics = True
+
     # TODO: Type datasource and query_object dictionary with TypedDict when it becomes
     # a vanilla python type https://github.com/python/mypy/issues/5288
     def __init__(
             self,
             datasource: Dict,
             queries: List[Dict],
+            force: bool = False,
     ):
         self.datasource = ConnectorRegistry.get_datasource(datasource.get('type'),
                                                            int(datasource.get('id')),
                                                            db.session)
         self.queries = list(map(lambda query_obj: QueryObject(**query_obj), queries))
 
-    def get_data(self):
-        raise NotImplementedError()
+        self.force = force
+        
+        self.query_details = []
+
+    def get_df(self, query_object):
+        """Returns a pandas dataframe based on the query object"""
+
+        # Here, we assume that all the queries will use the same datasource, which is
+        # is a valid assumption for current setting. In a long term, we may or maynot support
+        # multiple queries from different data source.
+
+        timestamp_format = None
+        if self.datasource.type == 'table':
+            dttm_col = self.datasource.get_col(query_obj.granularity)
+            if dttm_col:
+                timestamp_format = dttm_col.python_date_format
+
+        # The datasource here can be different backend but the interface is common
+        result = self.datasource.query(query_object.to_dict())
+        query_detail = {
+            'raw_query': result.query,
+            'status': result.status,
+            'error_message': result.error_message,
+        }
+        self.query_details.append(query_detail)
+
+        df = result.df
+        # Transform the timestamp we received from database to pandas supported
+        # datetime format. If no python_date_format is specified, the pattern will
+        # be considered as the default ISO date format
+        # If the datetime format is unix, the parse will use the corresponding
+        # parsing logic
+        if df is not None and not df.empty:
+            if DTTM_ALIAS in df.columns:
+                if timestamp_format in ('epoch_s', 'epoch_ms'):
+                    # Column has already been formatted as a timestamp.
+                    df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(pd.Timestamp)
+                else:
+                    df[DTTM_ALIAS] = pd.to_datetime(
+                        df[DTTM_ALIAS], utc=False, format=timestamp_format)
+                if self.datasource.offset:
+                    df[DTTM_ALIAS] += timedelta(hours=self.datasource.offset)
+                df[DTTM_ALIAS] += self.time_shift
+
+            if self.enforce_numerical_metrics:
+                self.df_metrics_to_num(df, query_object)
+
+            df.replace([np.inf, -np.inf], np.nan)
+            df = self.handle_nulls(df)
+        return df
+
+    def df_metrics_to_num(self, df, query_object):
+        """Converting metrics to numeric when pandas.read_sql cannot"""
+        metrics = query_object.get_metric_labels()
+        for col, dtype in df.dtypes.items():
+            if dtype.type == np.object_ and col in metrics:
+                df[col] = pd.to_numeric(df[col], errors='coerce')
+
+    def handle_nulls(self, df):
+        fillna = self.get_fillna_for_columns(df.columns)
+        return df.fillna(fillna)
+
+    def get_fillna_for_col(self, col):
+        """Returns the value to use as filler for a specific Column.type"""
+        if col:
+            if col.is_string:
+                return ' NULL'
+        return self.default_fillna
+
+    def get_fillna_for_columns(self, columns=None):
+        """Returns a dict or scalar that can be passed to DataFrame.fillna"""
+        if columns is None:
+            return self.default_fillna
+        columns_dict = {col.column_name: col for col in self.datasource.columns}
+        fillna = {
+            c: self.get_fillna_for_col(columns_dict.get(c))
+            for c in columns
+        }
+        return fillna
+
+    def get_data(self, df):
+        return df.to_dict(orient='records')
+
+    def get_payload(self, query_obj):
+        """Returns a payload of metadata and data"""
+        payload = self.get_df_payload(query_obj)
+        df = payload.get('df')
+        if self.status != utils.QueryStatus.FAILED:
+            if df is not None and df.empty:
+                payload['error'] = 'No data'
+            else:
+                payload['data'] = self.get_data(d)
+        if 'df' in payload:
+            del payload['df']
+        return payload
+
+    def get_payloads(self):
+        """Get all the paylaods from the arrays"""
+        return [self.get_payload(query_ojbect) for query_ojbect in self.queries]
+
+    def get_df_payload(self, query_obj):
+        """Handles caching around the df paylod retrieval"""
+        cache_key = query_obj.cache_key() if query_obj else None
+        logging.info('Cache key: {}'.format(cache_key))
+        is_loaded = False
+        stacktrace = None
+        df = None
+        cached_dttm = datetime.utcnow().isoformat().split('.')[0]
+        if cache_key and cache and not self.force:
+            cache_value = cache.get(cache_key)
+            if cache_value:
+                stats_logger.incr('loaded_from_cache')
+                try:
+                    cache_value = pkl.loads(cache_value)
+                    df = cache_value['df']
+                    self.query = cache_value['query']
+                    self._any_cached_dttm = cache_value['dttm']
+                    self._any_cache_key = cache_key
+                    self.status = utils.QueryStatus.SUCCESS
+                    is_loaded = True
+                except Exception as e:
+                    logging.exception(e)
+                    logging.error('Error reading cache: ' +
+                                  utils.error_msg_from_exception(e))
+                logging.info('Serving from cache')
+
+        if query_obj and not is_loaded:
+            try:
+                df = self.get_df(query_obj)
+                if self.status != utils.QueryStatus.FAILED:
+                    stats_logger.incr('loaded_from_source')
+                    is_loaded = True
+            except Exception as e:
+                logging.exception(e)
+                if not self.error_message:
+                    self.error_message = '{}'.format(e)
+                self.status = utils.QueryStatus.FAILED
+                stacktrace = traceback.format_exc()
+
+            if (
+                    is_loaded and
+                    cache_key and
+                    cache and
+                    self.status != utils.QueryStatus.FAILED):
+                try:
+                    cache_value = dict(
+                        dttm=cached_dttm,
+                        df=df if df is not None else None,
+                        query=self.query,
+                    )
+                    cache_value = pkl.dumps(
+                        cache_value, protocol=pkl.HIGHEST_PROTOCOL)
+
+                    logging.info('Caching {} chars at key {}'.format(
+                        len(cache_value), cache_key))
+
+                    stats_logger.incr('set_cache_key')
+                    cache.set(
+                        cache_key,
+                        cache_value,
+                        timeout=self.cache_timeout)
+                except Exception as e:
+                    # cache.set call can fail if the backend is down or if
+                    # the key is too large or whatever other reasons
+                    logging.warning('Could not cache key {}'.format(cache_key))
+                    logging.exception(e)
+                    cache.delete(cache_key)
+        return {
+            'cache_key': self._any_cache_key,
+            'cached_dttm': self._any_cached_dttm,
+            'cache_timeout': self.cache_timeout,
+            'df': df,
+            'error': self.error_message,
+            'form_data': self.form_data,
+            'is_cached': self._any_cache_key is not None,
+            'query': self.query,
+            'status': self.status,
+            'stacktrace': stacktrace,
+            'rowcount': len(df.index) if df is not None else 0,
+        }
+
diff --git a/superset/common/query_object.py b/superset/common/query_object.py
index 8116d26..dfac02c 100644
--- a/superset/common/query_object.py
+++ b/superset/common/query_object.py
@@ -45,3 +45,11 @@ class QueryObject:
 
     def to_dict(self):
         raise NotImplementedError()
+
+
+    def get_metric_labels(self):
+        raise NotImplementedError()
+
+
+    def cache_key(self):
+        raise NotImplementedError()
\ No newline at end of file


Mime
View raw message