airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From san...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-560] Get URI & SQLA engine from Connection
Date Tue, 11 Oct 2016 17:25:15 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 941500e14 -> c6de16b56


[AIRFLOW-560] Get URI & SQLA engine from Connection

Closes #1256 from gwax/sqlalchemy_conn


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c6de16b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c6de16b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c6de16b5

Branch: refs/heads/master
Commit: c6de16b563cb1f681c62696b755a9e2eb3c80341
Parents: 941500e
Author: George Leslie-Waksman <george@cloverhealth.com>
Authored: Tue Oct 11 10:24:04 2016 -0700
Committer: Siddharth Anand <siddharthanand@yahoo.com>
Committed: Tue Oct 11 10:25:24 2016 -0700

----------------------------------------------------------------------
 airflow/hooks/dbapi_hook.py | 18 ++++++++++++++++++
 tests/core.py               | 17 +++++++++++++++++
 2 files changed, 35 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c6de16b5/airflow/hooks/dbapi_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py
index 04af16e..9d6ddf0 100644
--- a/airflow/hooks/dbapi_hook.py
+++ b/airflow/hooks/dbapi_hook.py
@@ -19,6 +19,8 @@ import numpy
 import logging
 import sys
 
+from sqlalchemy import create_engine
+
 from airflow.hooks.base_hook import BaseHook
 from airflow.exceptions import AirflowException
 
@@ -56,6 +58,22 @@ class DbApiHook(BaseHook):
             username=db.login,
             schema=db.schema)
 
+    def get_uri(self):
+        conn = self.get_connection(getattr(self, self.conn_name_attr))
+        login = ''
+        if conn.login:
+            login = '{conn.login}:{conn.password}@'.format(conn=conn)
+        host = conn.host
+        if conn.port is not None:
+            host += ':{port}'.format(port=conn.port)
+        return '{conn.conn_type}://{login}{host}/{conn.schema}'.format(
+            conn=conn, login=login, host=host)
+
+    def get_sqlalchemy_engine(self, engine_kwargs=None):
+        if engine_kwargs is None:
+            engine_kwargs = {}
+        return create_engine(self.get_uri(), **engine_kwargs)
+
     def get_pandas_df(self, sql, parameters=None):
         """
         Executes the sql and returns a pandas dataframe

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c6de16b5/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index bafcd7e..626c093 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -30,6 +30,7 @@ from time import sleep
 import warnings
 
 from dateutil.relativedelta import relativedelta
+import sqlalchemy
 
 from airflow import configuration
 from airflow.executors import SequentialExecutor, LocalExecutor
@@ -48,6 +49,7 @@ from airflow.operators.http_operator import SimpleHttpOperator
 from airflow.operators import sensors
 from airflow.hooks.base_hook import BaseHook
 from airflow.hooks.sqlite_hook import SqliteHook
+from airflow.hooks.postgres_hook import PostgresHook
 from airflow.bin import cli
 from airflow.www import app as application
 from airflow.settings import Session
@@ -1730,6 +1732,21 @@ class ConnectionTest(unittest.TestCase):
         assert c.port == 5432
         del os.environ['AIRFLOW_CONN_AIRFLOW_DB']
 
+    def test_dbapi_get_uri(self):
+        conn = BaseHook.get_connection(conn_id='test_uri')
+        hook = conn.get_hook()
+        assert hook.get_uri() == 'postgres://username:password@ec2.compute.com:5432/the_database'
+        conn2 = BaseHook.get_connection(conn_id='test_uri_no_creds')
+        hook2 = conn2.get_hook()
+        assert hook2.get_uri() == 'postgres://ec2.compute.com/the_database'
+
+    def test_dbapi_get_sqlalchemy_engine(self):
+        conn = BaseHook.get_connection(conn_id='test_uri')
+        hook = conn.get_hook()
+        engine = hook.get_sqlalchemy_engine()
+        assert isinstance(engine, sqlalchemy.engine.Engine)
+        assert str(engine.url) == 'postgres://username:password@ec2.compute.com:5432/the_database'
+
 
 class WebHDFSHookTest(unittest.TestCase):
     def setUp(self):


Mime
View raw message