superset-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [incubator-superset] branch master updated: [fix] Isolate and improve performance on tagging system (#7858)
Date Wed, 31 Jul 2019 16:19:54 GMT
This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git


The following commit(s) were added to refs/heads/master by this push:
     new 10f00cd  [fix] Isolate and improve performance on tagging system (#7858)
10f00cd is described below

commit 10f00cdde329916e94897f2d32d04ca2d7745372
Author: Beto Dealmeida <roberto@dealmeida.net>
AuthorDate: Wed Jul 31 09:19:39 2019 -0700

    [fix] Isolate and improve performance on tagging system (#7858)
    
    * Fix tag perf
    
    * Add ASF header
    
    * Make script idempotent
    
    * Add CLI to sync tags
    
    * Add missing file
    
    * Merge heads
    
    * Fix lint
    
    * Remove script
---
 superset/cli.py                                    |  12 +
 superset/common/tags.py                            | 385 +++++++++++++++++++++
 .../def97f26fdfb_add_index_to_tagged_object.py     |  39 +++
 superset/models/core.py                            |  19 +-
 4 files changed, 446 insertions(+), 9 deletions(-)

diff --git a/superset/cli.py b/superset/cli.py
index 65b08bf..5721220 100755
--- a/superset/cli.py
+++ b/superset/cli.py
@@ -24,10 +24,12 @@ from sys import stdout
 import click
 from colorama import Fore, Style
 from flask import g
+from flask_appbuilder import Model
 from pathlib2 import Path
 import yaml
 
 from superset import app, appbuilder, db, examples, security_manager
+from superset.common.tags import add_favorites, add_owners, add_types
 from superset.utils import core as utils, dashboard_import_export, dict_import_export
 
 config = app.config
@@ -497,3 +499,13 @@ def load_test_users_run():
                 password="general",
             )
         security_manager.get_session.commit()
+
+
+@app.cli.command()
+def sync_tags():
+    """Rebuilds special tags (owner, type, favorited by)."""
+    # pylint: disable=no-member
+    metadata = Model.metadata
+    add_types(db.engine, metadata)
+    add_owners(db.engine, metadata)
+    add_favorites(db.engine, metadata)
diff --git a/superset/common/tags.py b/superset/common/tags.py
new file mode 100644
index 0000000..657611c
--- /dev/null
+++ b/superset/common/tags.py
@@ -0,0 +1,385 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy.exc import IntegrityError
+from sqlalchemy.sql import and_, func, functions, join, literal, select
+
+from superset.models.tags import ObjectTypes, TagTypes
+
+
+def add_types(engine, metadata):
+    """
+    Tag every object according to its type:
+
+      INSERT INTO tagged_object (tag_id, object_id, object_type)
+      SELECT
+        tag.id AS tag_id,
+        slices.id AS object_id,
+        'chart' AS object_type
+      FROM slices
+      JOIN tag
+        ON tag.name = 'type:chart'
+      LEFT OUTER JOIN tagged_object
+        ON tagged_object.tag_id = tag.id
+        AND tagged_object.object_id = slices.id
+        AND tagged_object.object_type = 'chart'
+      WHERE tagged_object.tag_id IS NULL;
+
+      INSERT INTO tagged_object (tag_id, object_id, object_type)
+      SELECT
+        tag.id AS tag_id,
+        dashboards.id AS object_id,
+        'dashboard' AS object_type
+      FROM dashboards
+      JOIN tag
+      ON tag.name = 'type:dashboard'
+      LEFT OUTER JOIN tagged_object
+        ON tagged_object.tag_id = tag.id
+        AND tagged_object.object_id = dashboards.id
+        AND tagged_object.object_type = 'dashboard'
+      WHERE tagged_object.tag_id IS NULL;
+
+      INSERT INTO tagged_object (tag_id, object_id, object_type)
+      SELECT
+        tag.id AS tag_id,
+        saved_query.id AS object_id,
+        'query' AS object_type
+      FROM saved_query
+      JOIN tag
+      ON tag.name = 'type:query';
+      LEFT OUTER JOIN tagged_object
+        ON tagged_object.tag_id = tag.id
+        AND tagged_object.object_id = saved_query.id
+        AND tagged_object.object_type = 'query'
+      WHERE tagged_object.tag_id IS NULL;
+
+    """
+
+    tag = metadata.tables["tag"]
+    tagged_object = metadata.tables["tagged_object"]
+    slices = metadata.tables["slices"]
+    dashboards = metadata.tables["dashboards"]
+    saved_query = metadata.tables["saved_query"]
+    columns = ["tag_id", "object_id", "object_type"]
+
+    # add a tag for each object type
+    insert = tag.insert()
+    for type_ in ObjectTypes.__members__:
+        try:
+            engine.execute(insert, name=f"type:{type_}", type=TagTypes.type)
+        except IntegrityError:
+            pass  # already exists
+
+    charts = (
+        select(
+            [
+                tag.c.id.label("tag_id"),
+                slices.c.id.label("object_id"),
+                literal(ObjectTypes.chart.name).label("object_type"),
+            ]
+        )
+        .select_from(
+            join(
+                join(slices, tag, tag.c.name == "type:chart"),
+                tagged_object,
+                and_(
+                    tagged_object.c.tag_id == tag.c.id,
+                    tagged_object.c.object_id == slices.c.id,
+                    tagged_object.c.object_type == "chart",
+                ),
+                isouter=True,
+                full=False,
+            )
+        )
+        .where(tagged_object.c.tag_id.is_(None))
+    )
+    query = tagged_object.insert().from_select(columns, charts)
+    engine.execute(query)
+
+    dashboards = (
+        select(
+            [
+                tag.c.id.label("tag_id"),
+                dashboards.c.id.label("object_id"),
+                literal(ObjectTypes.dashboard.name).label("object_type"),
+            ]
+        )
+        .select_from(
+            join(
+                join(dashboards, tag, tag.c.name == "type:dashboard"),
+                tagged_object,
+                and_(
+                    tagged_object.c.tag_id == tag.c.id,
+                    tagged_object.c.object_id == dashboards.c.id,
+                    tagged_object.c.object_type == "dashboard",
+                ),
+                isouter=True,
+                full=False,
+            )
+        )
+        .where(tagged_object.c.tag_id.is_(None))
+    )
+    query = tagged_object.insert().from_select(columns, dashboards)
+    engine.execute(query)
+
+    saved_queries = (
+        select(
+            [
+                tag.c.id.label("tag_id"),
+                saved_query.c.id.label("object_id"),
+                literal(ObjectTypes.query.name).label("object_type"),
+            ]
+        )
+        .select_from(
+            join(
+                join(saved_query, tag, tag.c.name == "type:query"),
+                tagged_object,
+                and_(
+                    tagged_object.c.tag_id == tag.c.id,
+                    tagged_object.c.object_id == saved_query.c.id,
+                    tagged_object.c.object_type == "query",
+                ),
+                isouter=True,
+                full=False,
+            )
+        )
+        .where(tagged_object.c.tag_id.is_(None))
+    )
+    query = tagged_object.insert().from_select(columns, saved_queries)
+    engine.execute(query)
+
+
+def add_owners(engine, metadata):
+    """
+    Tag every object according to its owner:
+
+      INSERT INTO tagged_object (tag_id, object_id, object_type)
+      SELECT
+        tag.id AS tag_id,
+        slices.id AS object_id,
+        'chart' AS object_type
+      FROM slices
+      JOIN tag
+      ON tag.name = CONCAT('owner:', slices.created_by_fk)
+      LEFT OUTER JOIN tagged_object
+        ON tagged_object.tag_id = tag.id
+        AND tagged_object.object_id = slices.id
+        AND tagged_object.object_type = 'chart'
+      WHERE tagged_object.tag_id IS NULL;
+
+      SELECT
+        tag.id AS tag_id,
+        dashboards.id AS object_id,
+        'dashboard' AS object_type
+      FROM dashboards
+      JOIN tag
+      ON tag.name = CONCAT('owner:', dashboards.created_by_fk)
+      LEFT OUTER JOIN tagged_object
+        ON tagged_object.tag_id = tag.id
+        AND tagged_object.object_id = dashboards.id
+        AND tagged_object.object_type = 'dashboard'
+      WHERE tagged_object.tag_id IS NULL;
+
+      SELECT
+        tag.id AS tag_id,
+        saved_query.id AS object_id,
+        'query' AS object_type
+      FROM saved_query
+      JOIN tag
+      ON tag.name = CONCAT('owner:', saved_query.created_by_fk)
+      LEFT OUTER JOIN tagged_object
+        ON tagged_object.tag_id = tag.id
+        AND tagged_object.object_id = saved_query.id
+        AND tagged_object.object_type = 'query'
+      WHERE tagged_object.tag_id IS NULL;
+
+    """
+
+    tag = metadata.tables["tag"]
+    tagged_object = metadata.tables["tagged_object"]
+    users = metadata.tables["ab_user"]
+    slices = metadata.tables["slices"]
+    dashboards = metadata.tables["dashboards"]
+    saved_query = metadata.tables["saved_query"]
+    columns = ["tag_id", "object_id", "object_type"]
+
+    # create a custom tag for each user
+    ids = select([users.c.id])
+    insert = tag.insert()
+    for (id_,) in engine.execute(ids):
+        try:
+            engine.execute(insert, name=f"owner:{id_}", type=TagTypes.owner)
+        except IntegrityError:
+            pass  # already exists
+
+    charts = (
+        select(
+            [
+                tag.c.id.label("tag_id"),
+                slices.c.id.label("object_id"),
+                literal(ObjectTypes.chart.name).label("object_type"),
+            ]
+        )
+        .select_from(
+            join(
+                join(
+                    slices,
+                    tag,
+                    tag.c.name == functions.concat("owner:", slices.c.created_by_fk),
+                ),
+                tagged_object,
+                and_(
+                    tagged_object.c.tag_id == tag.c.id,
+                    tagged_object.c.object_id == slices.c.id,
+                    tagged_object.c.object_type == "chart",
+                ),
+                isouter=True,
+                full=False,
+            )
+        )
+        .where(tagged_object.c.tag_id.is_(None))
+    )
+    query = tagged_object.insert().from_select(columns, charts)
+    engine.execute(query)
+
+    dashboards = (
+        select(
+            [
+                tag.c.id.label("tag_id"),
+                dashboards.c.id.label("object_id"),
+                literal(ObjectTypes.dashboard.name).label("object_type"),
+            ]
+        )
+        .select_from(
+            join(
+                join(
+                    dashboards,
+                    tag,
+                    tag.c.name
+                    == functions.concat("owner:", dashboards.c.created_by_fk),
+                ),
+                tagged_object,
+                and_(
+                    tagged_object.c.tag_id == tag.c.id,
+                    tagged_object.c.object_id == dashboards.c.id,
+                    tagged_object.c.object_type == "dashboard",
+                ),
+                isouter=True,
+                full=False,
+            )
+        )
+        .where(tagged_object.c.tag_id.is_(None))
+    )
+    query = tagged_object.insert().from_select(columns, dashboards)
+    engine.execute(query)
+
+    saved_queries = (
+        select(
+            [
+                tag.c.id.label("tag_id"),
+                saved_query.c.id.label("object_id"),
+                literal(ObjectTypes.query.name).label("object_type"),
+            ]
+        )
+        .select_from(
+            join(
+                join(
+                    saved_query,
+                    tag,
+                    tag.c.name
+                    == functions.concat("owner:", saved_query.c.created_by_fk),
+                ),
+                tagged_object,
+                and_(
+                    tagged_object.c.tag_id == tag.c.id,
+                    tagged_object.c.object_id == saved_query.c.id,
+                    tagged_object.c.object_type == "query",
+                ),
+                isouter=True,
+                full=False,
+            )
+        )
+        .where(tagged_object.c.tag_id.is_(None))
+    )
+    query = tagged_object.insert().from_select(columns, saved_queries)
+    engine.execute(query)
+
+
+def add_favorites(engine, metadata):
+    """
+    Tag every object that was favorited:
+
+      INSERT INTO tagged_object (tag_id, object_id, object_type)
+      SELECT
+        tag.id AS tag_id,
+        favstar.obj_id AS object_id,
+        LOWER(favstar.class_name) AS object_type
+      FROM favstar
+      JOIN tag
+      ON tag.name = CONCAT('favorited_by:', favstar.user_id)
+      LEFT OUTER JOIN tagged_object
+        ON tagged_object.tag_id = tag.id
+        AND tagged_object.object_id = favstar.obj_id
+        AND tagged_object.object_type = LOWER(favstar.class_name)
+      WHERE tagged_object.tag_id IS NULL;
+
+    """
+
+    tag = metadata.tables["tag"]
+    tagged_object = metadata.tables["tagged_object"]
+    users = metadata.tables["ab_user"]
+    favstar = metadata.tables["favstar"]
+    columns = ["tag_id", "object_id", "object_type"]
+
+    # create a custom tag for each user
+    ids = select([users.c.id])
+    insert = tag.insert()
+    for (id_,) in engine.execute(ids):
+        try:
+            engine.execute(insert, name=f"favorited_by:{id_}", type=TagTypes.type)
+        except IntegrityError:
+            pass  # already exists
+
+    favstars = (
+        select(
+            [
+                tag.c.id.label("tag_id"),
+                favstar.c.obj_id.label("object_id"),
+                func.lower(favstar.c.class_name).label("object_type"),
+            ]
+        )
+        .select_from(
+            join(
+                join(
+                    favstar,
+                    tag,
+                    tag.c.name == functions.concat("favorited_by:", favstar.c.user_id),
+                ),
+                tagged_object,
+                and_(
+                    tagged_object.c.tag_id == tag.c.id,
+                    tagged_object.c.object_id == favstar.c.obj_id,
+                    tagged_object.c.object_type == func.lower(favstar.c.class_name),
+                ),
+                isouter=True,
+                full=False,
+            )
+        )
+        .where(tagged_object.c.tag_id.is_(None))
+    )
+    query = tagged_object.insert().from_select(columns, favstars)
+    engine.execute(query)
diff --git a/superset/migrations/versions/def97f26fdfb_add_index_to_tagged_object.py b/superset/migrations/versions/def97f26fdfb_add_index_to_tagged_object.py
new file mode 100644
index 0000000..b3af2ed
--- /dev/null
+++ b/superset/migrations/versions/def97f26fdfb_add_index_to_tagged_object.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Add index to tagged_object
+
+Revision ID: def97f26fdfb
+Revises: d6ffdf31bdd4
+Create Date: 2019-07-11 19:02:38.768324
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = "def97f26fdfb"
+down_revision = "190188938582"
+
+from alembic import op
+
+
+def upgrade():
+    op.create_index(
+        op.f("ix_tagged_object_object_id"), "tagged_object", ["object_id"], unique=False
+    )
+
+
+def downgrade():
+    op.drop_index(op.f("ix_tagged_object_object_id"), table_name="tagged_object")
diff --git a/superset/models/core.py b/superset/models/core.py
index 5a98d5e..e3dcafa 100755
--- a/superset/models/core.py
+++ b/superset/models/core.py
@@ -52,7 +52,7 @@ from sqlalchemy.schema import UniqueConstraint
 from sqlalchemy_utils import EncryptedType
 import sqlparse
 
-from superset import app, db, db_engine_specs, security_manager
+from superset import app, db, db_engine_specs, is_feature_enabled, security_manager
 from superset.connectors.connector_registry import ConnectorRegistry
 from superset.legacy import update_time_range
 from superset.models.helpers import AuditMixinNullable, ImportMixin
@@ -1299,11 +1299,12 @@ class DatasourceAccessRequest(Model, AuditMixinNullable):
 
 
 # events for updating tags
-sqla.event.listen(Slice, "after_insert", ChartUpdater.after_insert)
-sqla.event.listen(Slice, "after_update", ChartUpdater.after_update)
-sqla.event.listen(Slice, "after_delete", ChartUpdater.after_delete)
-sqla.event.listen(Dashboard, "after_insert", DashboardUpdater.after_insert)
-sqla.event.listen(Dashboard, "after_update", DashboardUpdater.after_update)
-sqla.event.listen(Dashboard, "after_delete", DashboardUpdater.after_delete)
-sqla.event.listen(FavStar, "after_insert", FavStarUpdater.after_insert)
-sqla.event.listen(FavStar, "after_delete", FavStarUpdater.after_delete)
+if is_feature_enabled("TAGGING_SYSTEM"):
+    sqla.event.listen(Slice, "after_insert", ChartUpdater.after_insert)
+    sqla.event.listen(Slice, "after_update", ChartUpdater.after_update)
+    sqla.event.listen(Slice, "after_delete", ChartUpdater.after_delete)
+    sqla.event.listen(Dashboard, "after_insert", DashboardUpdater.after_insert)
+    sqla.event.listen(Dashboard, "after_update", DashboardUpdater.after_update)
+    sqla.event.listen(Dashboard, "after_delete", DashboardUpdater.after_delete)
+    sqla.event.listen(FavStar, "after_insert", FavStarUpdater.after_insert)
+    sqla.event.listen(FavStar, "after_delete", FavStarUpdater.after_delete)


Mime
View raw message