airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [2/2] incubator-airflow git commit: [AIRFLOW-504] Store fractional seconds in MySQL tables
Date Sun, 13 Nov 2016 22:02:24 GMT
[AIRFLOW-504] Store fractional seconds in MySQL tables

Both utcnow() and now() return fractional seconds. These
are sometimes used in primary_keys (eg. in task_instance).
If MySQL is not configured to store these fractional seconds
a primary key might fail (eg. at session.merge) resulting in
a duplicate entry being added or worse.

Postgres does store fractional seconds if left unconfigured,
sqlite needs to be examined.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/910c0ddd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/910c0ddd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/910c0ddd

Branch: refs/heads/master
Commit: 910c0ddd78d45db1dc613359f3bb1d3ad37bdf74
Parents: d12ef6f
Author: Bolke de Bruin <bolke@xs4all.nl>
Authored: Mon Sep 12 15:19:17 2016 +0200
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Sun Nov 13 22:43:17 2016 +0100

----------------------------------------------------------------------
 ...f1_add_fractional_seconds_to_mysql_tables.py | 111 +++++++++++++++++++
 scripts/ci/load_data.sh                         |   2 +-
 tests/core.py                                   |  28 +++++
 3 files changed, 140 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/910c0ddd/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
b/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
new file mode 100644
index 0000000..c1c6de3
--- /dev/null
+++ b/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
@@ -0,0 +1,111 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Add fractional seconds to mysql tables
+
+Revision ID: 4addfa1236f1
+Revises: f2ca10b85618
+Create Date: 2016-09-11 13:39:18.592072
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '4addfa1236f1'
+down_revision = 'f2ca10b85618'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects import mysql
+from alembic import context
+
+
+def upgrade():
+    if context.config.get_main_option('sqlalchemy.url').startswith('mysql'):
+        op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='import_error', column_name='timestamp', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='job', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='job', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6),
nullable=False)
+        op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6),
nullable=False)
+        op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+
+
+def downgrade():
+    if context.config.get_main_option('sqlalchemy.url').startswith('mysql'):
+        op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.DATETIME())
+        op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.DATETIME())
+        op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.DATETIME())
+
+        op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.DATETIME())
+
+        op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.DATETIME())
+        op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.DATETIME())
+        op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.DATETIME())
+
+        op.alter_column(table_name='import_error', column_name='timestamp', type_=mysql.DATETIME())
+
+        op.alter_column(table_name='job', column_name='start_date', type_=mysql.DATETIME())
+        op.alter_column(table_name='job', column_name='end_date', type_=mysql.DATETIME())
+        op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.DATETIME())
+
+        op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.DATETIME())
+        op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.DATETIME())
+
+        op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME())
+        op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME())
+
+        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(),
nullable=False)
+        op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.DATETIME())
+
+        op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME())
+        op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME())
+        op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME())
+
+        op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(),
nullable=False)
+        op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME())
+        op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME())
+        op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME())
+
+        op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.DATETIME())
+        op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.DATETIME())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/910c0ddd/scripts/ci/load_data.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/load_data.sh b/scripts/ci/load_data.sh
index 0863d26..3422b07 100755
--- a/scripts/ci/load_data.sh
+++ b/scripts/ci/load_data.sh
@@ -24,5 +24,5 @@ DATABASE=airflow_ci
 
 mysqladmin -u root create ${DATABASE}
 mysql -u root < ${DATA_DIR}/mysql_schema.sql
-mysqlimport -u root --fields-optionally-enclosed-by="\"" --fields-terminated-by=, --ignore-lines=1
${DATABASE} ${DATA_FILE}
+mysqlimport --local -u root --fields-optionally-enclosed-by="\"" --fields-terminated-by=,
--ignore-lines=1 ${DATABASE} ${DATA_FILE}
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/910c0ddd/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 912ee9f..effc63d 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -193,6 +193,34 @@ class CoreTest(unittest.TestCase):
         assert dag_run2 is None
         dag.clear()
 
+    def test_fractional_seconds(self):
+        """
+        Tests if fractional seconds are stored in the database
+        """
+        dag = DAG(TEST_DAG_ID + 'test_fractional_seconds')
+        dag.schedule_interval = '@once'
+        dag.add_task(models.BaseOperator(
+            task_id="faketastic",
+            owner='Also fake',
+            start_date=datetime(2015, 1, 2, 0, 0)))
+
+        start_date = datetime.now()
+
+        run = dag.create_dagrun(
+            run_id='test_' + start_date.isoformat(),
+            execution_date=start_date,
+            start_date=start_date,
+            state=State.RUNNING,
+            external_trigger=False
+        )
+
+        run.refresh_from_db()
+
+        self.assertEqual(start_date, run.execution_date,
+                         "dag run execution_date loses precision")
+        self.assertEqual(start_date, run.start_date,
+                         "dag run start_date loses precision ")
+
     def test_schedule_dag_start_end_dates(self):
         """
         Tests that an attempt to schedule a task after the Dag's end_date


Mime
View raw message