airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-139] Let psycopg2 handle autocommit for PostgresHook
Date Tue, 24 Jan 2017 14:51:48 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master a2b0ea322 -> ac9167f37


[AIRFLOW-139] Let psycopg2 handle autocommit for PostgresHook

The server-side autocommit setting was removed and reimplemented
in client applications and languages. Server-side autocommit was
causing too many problems with languages and applications that
wanted to control their own autocommit behavior,
so autocommit was removed from the server and added to individual client APIs as appropriate

Closes #1821 from danielzohar/AIRFLOW-
139_vacuum_operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ac9167f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ac9167f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ac9167f3

Branch: refs/heads/master
Commit: ac9167f37b586f9ece381763b91a0ee25d736f38
Parents: a2b0ea3
Author: Daniel Zohar <i@danielzohar.com>
Authored: Tue Jan 24 15:45:39 2017 +0100
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Tue Jan 24 15:45:48 2017 +0100

----------------------------------------------------------------------
 airflow/hooks/postgres_hook.py |  4 +---
 tests/operators/operators.py   | 24 ++++++++++++++++++------
 2 files changed, 19 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac9167f3/airflow/hooks/postgres_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/postgres_hook.py b/airflow/hooks/postgres_hook.py
index d096d75..75c8226 100644
--- a/airflow/hooks/postgres_hook.py
+++ b/airflow/hooks/postgres_hook.py
@@ -26,7 +26,7 @@ class PostgresHook(DbApiHook):
     '''
     conn_name_attr = 'postgres_conn_id'
     default_conn_name = 'postgres_default'
-    supports_autocommit = False
+    supports_autocommit = True
 
     def get_conn(self):
         conn = self.get_connection(self.postgres_conn_id)
@@ -41,8 +41,6 @@ class PostgresHook(DbApiHook):
             if arg_name in ['sslmode', 'sslcert', 'sslkey', 'sslrootcert', 'sslcrl', 'application_name']:
                 conn_args[arg_name] = arg_val
         psycopg2_conn = psycopg2.connect(**conn_args)
-        if psycopg2_conn.server_version < 70400:
-            self.supports_autocommit = True
         return psycopg2_conn
 
     @staticmethod

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac9167f3/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 7458827..7aaf12e 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -15,18 +15,14 @@
 from __future__ import print_function
 
 import datetime
-import os
-import unittest
-import six
 
-from airflow import DAG, configuration, operators, utils
+from airflow import DAG, configuration, operators
 from airflow.utils.tests import skipUnlessImported
+
 configuration.load_test_config()
 
-import os
 import unittest
 
-
 DEFAULT_DATE = datetime.datetime(2015, 1, 1)
 DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
 DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
@@ -118,6 +114,7 @@ class MySqlTest(unittest.TestCase):
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+
 @skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator')
 class PostgresTest(unittest.TestCase):
     def setUp(self):
@@ -182,6 +179,21 @@ class PostgresTest(unittest.TestCase):
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+    def test_vacuum(self):
+        """
+        Verifies the VACUUM operation runs well with the PostgresOperator
+        """
+        import airflow.operators.postgres_operator
+
+        sql = "VACUUM ANALYZE;"
+        t = operators.postgres_operator.PostgresOperator(
+            task_id='postgres_operator_test_vacuum',
+            sql=sql,
+            dag=self.dag,
+            autocommit=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+
 @skipUnlessImported('airflow.operators.hive_operator', 'HiveOperator')
 @skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator')
 class TransferTests(unittest.TestCase):


Mime
View raw message