airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hila Visan (JIRA)" <j...@apache.org>
Subject [jira] [Created] (AIRFLOW-503) ExternalTask
Date Sun, 11 Sep 2016 09:03:21 GMT
Hila Visan created AIRFLOW-503:
----------------------------------

             Summary: ExternalTask 
                 Key: AIRFLOW-503
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-503
             Project: Apache Airflow
          Issue Type: Bug
          Components: db, operators
    Affects Versions: Airflow 1.7.1
         Environment: airflow 1.7.1.3.
postgress 9.2.13 (backend DB)
OS   Red Hat Enterprise Linux Server 7.2 (Maipo)
python 2.7.5
            Reporter: Hila Visan
            Priority: Critical


I just created a new task using ExternalTaskSensor between weekly dag and daily dag (named
'services_daily_sensor') .
When I tried to test it, i ran the command:
'airflow test weekly_agg services_daily_sensor 2016-09-11T06:00:00'.

The command failed with the following error:
 
ervices_daily_sensor> on 2016-09-11 06:00:00
[2016-09-11 08:59:09,602] {sensors.py:195} INFO - Poking for daily_agg.services_daily_task
on 2016-09-11 02:00:00 ...
[2016-09-11 08:59:09,614] {models.py:1286} ERROR - (psycopg2.ProgrammingError) can't adapt
type 'builtin_function_or_method' [SQL: 'SELECT count(*) AS count_1 \nFROM (SELECT task_instance.task_id
AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date
AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date
AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state
AS task_instance_state, task_instance.try_number AS task_instance_try_number, task_instance.hostname
AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id
AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.queue AS
task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator
AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm \nFROM task_instance
\nWHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND
task_instance.state IN (%(state_1)s) AND task_instance.execution_date = %(execution_date_1)s)
AS anon_1'] [parameters: {'state_1': <built-in function all>, 'execution_date_1': datetime.datetime(2016,
9, 11, 2, 0), 'dag_id_1': 'daily_agg', 'task_id_1': 'services_daily_task'}]
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1242, in run
    result = task_copy.execute(context=context)
  File "/usr/lib/python2.7/site-packages/airflow/operators/sensors.py", line 56, in execute
    while not self.poke(context):
  File "/usr/lib/python2.7/site-packages/airflow/operators/sensors.py", line 203, in poke
    TI.execution_date == dttm,
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2980, in count
    return self.from_self(col).scalar()
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2749, in scalar
    ret = self.one()
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2718, in one
    ret = list(self)
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2761, in __iter__
    return self._execute_and_instances(context)
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2776, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 914, in execute
    return meth(self, multiparams, params)
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/elements.py", line 323, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1010, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1146, in _execute_context
    context)
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1341, in _handle_dbapi_exception
    exc_info
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1139, in _execute_context
    context)
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/default.py", line 450, in do_execute
    cursor.execute(statement, parameters)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message