airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias Huschle (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-503) ExternalTaskSensor causes runtime exception
Date Mon, 05 Dec 2016 13:21:59 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15722248#comment-15722248
] 

Matthias Huschle commented on AIRFLOW-503:
------------------------------------------

I guess your problem lies in "allowed_states=[all]". "all" is a built-in function. The argument
should be a list of states, as defined in the "State" class.


> ExternalTaskSensor causes runtime exception
> -------------------------------------------
>
>                 Key: AIRFLOW-503
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-503
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: db, operators
>    Affects Versions: Airflow 1.7.1
>         Environment: airflow 1.7.1.3.
> postgress 9.2.13 (backend DB)
> OS   Red Hat Enterprise Linux Server 7.2 (Maipo)
> python 2.7.5
>            Reporter: Hila Visan
>            Priority: Critical
>
> I just created a new task using ExternalTaskSensor between weekly dag and daily dag (named
'services_daily_sensor') .
> When I tried to test it, i ran the command:
> 'airflow test weekly_agg services_daily_sensor 2016-09-11T06:00:00'.
> The command failed with the following error:
>  
> ervices_daily_sensor> on 2016-09-11 06:00:00
> [2016-09-11 08:59:09,602] {sensors.py:195} INFO - Poking for daily_agg.services_daily_task
on 2016-09-11 02:00:00 ...
> [2016-09-11 08:59:09,614] {models.py:1286} ERROR - (psycopg2.ProgrammingError) can't
adapt type 'builtin_function_or_method' [SQL: 'SELECT count(*) AS count_1 \nFROM (SELECT task_instance.task_id
AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date
AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date
AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state
AS task_instance_state, task_instance.try_number AS task_instance_try_number, task_instance.hostname
AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id
AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.queue AS
task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator
AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm \nFROM task_instance
\nWHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND
task_instance.state IN (%(state_1)s) AND task_instance.execution_date = %(execution_date_1)s)
AS anon_1'] [parameters: {'state_1': <built-in function all>, 'execution_date_1': datetime.datetime(2016,
9, 11, 2, 0), 'dag_id_1': 'daily_agg', 'task_id_1': 'services_daily_task'}]
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1242, in run
>     result = task_copy.execute(context=context)
>   File "/usr/lib/python2.7/site-packages/airflow/operators/sensors.py", line 56, in execute
>     while not self.poke(context):
>   File "/usr/lib/python2.7/site-packages/airflow/operators/sensors.py", line 203, in
poke
>     TI.execution_date == dttm,
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2980, in count
>     return self.from_self(col).scalar()
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2749, in scalar
>     ret = self.one()
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2718, in one
>     ret = list(self)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2761, in __iter__
>     return self._execute_and_instances(context)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2776, in _execute_and_instances
>     result = conn.execute(querycontext.statement, self._params)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 914, in execute
>     return meth(self, multiparams, params)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/elements.py", line 323, in
_execute_on_connection
>     return connection._execute_clauseelement(self, multiparams, params)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1010, in
_execute_clauseelement
>     compiled_sql, distilled_params
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1146, in
_execute_context
>     context)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1341, in
_handle_dbapi_exception
>     exc_info
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
>     reraise(type(exception), exception, tb=exc_tb, cause=cause)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1139, in
_execute_context
>     context)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/default.py", line 450, in
do_execute
>     cursor.execute(statement, parameters)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message