superset-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maximebeauche...@apache.org
Subject [incubator-superset] branch master updated: Modernize SQLA pessimistic handling (#3256)
Date Wed, 09 Aug 2017 16:10:15 GMT
This is an automated email from the ASF dual-hosted git repository.

maximebeauchemin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git


The following commit(s) were added to refs/heads/master by this push:
     new cc36428  Modernize SQLA pessimistic handling (#3256)
cc36428 is described below

commit cc36428260f979d78e69d86dac0e6a4ba6b17780
Author: Maxime Beauchemin <maximebeauchemin@gmail.com>
AuthorDate: Wed Aug 9 09:10:12 2017 -0700

    Modernize SQLA pessimistic handling (#3256)
    
    Looks like SQLAlchemy has redefined the best practice around
    pessimistic connection handling.
---
 superset/__init__.py |  2 +-
 superset/utils.py    | 49 ++++++++++++++++++++++++++++++++++++-------------
 2 files changed, 37 insertions(+), 14 deletions(-)

diff --git a/superset/__init__.py b/superset/__init__.py
index fceff86..b8fccd9 100644
--- a/superset/__init__.py
+++ b/superset/__init__.py
@@ -69,7 +69,7 @@ db = SQLA(app)
 if conf.get('WTF_CSRF_ENABLED'):
     csrf = CSRFProtect(app)
 
-utils.pessimistic_connection_handling(db.engine.pool)
+utils.pessimistic_connection_handling(db.engine)
 
 cache = utils.setup_cache(app, conf.get('CACHE_CONFIG'))
 tables_cache = utils.setup_cache(app, conf.get('TABLE_NAMES_CACHE_CONFIG'))
diff --git a/superset/utils.py b/superset/utils.py
index 49a0f47..ed84186 100644
--- a/superset/utils.py
+++ b/superset/utils.py
@@ -40,7 +40,7 @@ from flask_babel import gettext as __
 import markdown as md
 from past.builtins import basestring
 from pydruid.utils.having import Having
-from sqlalchemy import event, exc
+from sqlalchemy import event, exc, select
 from sqlalchemy.types import TypeDecorator, TEXT
 
 logging.getLogger('MARKDOWN').setLevel(logging.INFO)
@@ -436,19 +436,42 @@ class timeout(object):
             logging.warning("timeout can't be used in the current context")
             logging.exception(e)
 
-def pessimistic_connection_handling(target):
-    @event.listens_for(target, "checkout")
-    def ping_connection(dbapi_connection, connection_record, connection_proxy):
-        """
-        Disconnect Handling - Pessimistic, taken from:
-        http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html
-        """
-        cursor = dbapi_connection.cursor()
+
+def pessimistic_connection_handling(some_engine):
+    @event.listens_for(some_engine, "engine_connect")
+    def ping_connection(connection, branch):
+        if branch:
+            # "branch" refers to a sub-connection of a connection,
+            # we don't want to bother pinging on these.
+            return
+
+        # turn off "close with result".  This flag is only used with
+        # "connectionless" execution, otherwise will be False in any case
+        save_should_close_with_result = connection.should_close_with_result
+        connection.should_close_with_result = False
+
         try:
-            cursor.execute("SELECT 1")
-        except:
-            raise exc.DisconnectionError()
-        cursor.close()
+            # run a SELECT 1.   use a core select() so that
+            # the SELECT of a scalar value without a table is
+            # appropriately formatted for the backend
+            connection.scalar(select([1]))
+        except exc.DBAPIError as err:
+            # catch SQLAlchemy's DBAPIError, which is a wrapper
+            # for the DBAPI's exception.  It includes a .connection_invalidated
+            # attribute which specifies if this connection is a "disconnect"
+            # condition, which is based on inspection of the original exception
+            # by the dialect in use.
+            if err.connection_invalidated:
+                # run the same SELECT again - the connection will re-validate
+                # itself and establish a new connection.  The disconnect detection
+                # here also causes the whole connection pool to be invalidated
+                # so that all stale connections are discarded.
+                connection.scalar(select([1]))
+            else:
+                raise
+        finally:
+            # restore "close with result"
+            connection.should_close_with_result = save_should_close_with_result
 
 
 class QueryStatus(object):

-- 
To stop receiving notification emails like this one, please contact
['"commits@superset.apache.org" <commits@superset.apache.org>'].

Mime
View raw message