airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jakub Powierza (JIRA)" <j...@apache.org>
Subject [jira] [Created] (AIRFLOW-1284) DetachedInstanceError while using dependencies
Date Tue, 06 Jun 2017 16:39:18 GMT
Jakub Powierza created AIRFLOW-1284:
---------------------------------------

             Summary: DetachedInstanceError while using dependencies
                 Key: AIRFLOW-1284
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1284
             Project: Apache Airflow
          Issue Type: Bug
         Environment: Airflow 1.8.1 on Ubuntu 14.04 and PostgreSQL database.
            Reporter: Jakub Powierza


*What happened?*
I've encountered an error that seems to be related with Airflow's dependencies. I've enabled
them recently and they failed on my environment.

*What I'm using?*
I'm using Airflow 1.8.1 with Ubuntu 14.04 and PostgreSQL database.

*My usage:*
{noformat}
class CustomDependency(BaseTIDep):
    ...
    def _get_dep_statuses(self, ti: TaskInstance, session: scoped_session, dep_context: Mapping
= None) -> Iterable[TIDepStatus]:
        my_remote_system = get_remote_system_by_name(remote_system_name)
        if my_remote_system.is_up:
            yield self._passing_status(reason='Remote system {} is up.'.format(my_remote_system))
        else:
            yield self._failing_status(reason='Remote system {} appears to be down.'.format(my_remote_system))

class MyOperator(BaseOperator):
    ...
    def deps(self) -> MutableSet[BaseTIDep]:
        deps = super().deps
        deps.add(CustomDependency(self.remote_system_name))
        return deps
{noformat}

*Exception:*
{noformat}
Traceback (most recent call last):
  File "/opt/GTA/workflow/venv/src/airflow/airflow/jobs.py", line 324, in helper
    pickle_dags)
  File "/opt/GTA/workflow/venv/src/airflow/airflow/utils/db.py", line 54, in wrapper
    result = func(*args, **kwargs)
  File "/opt/GTA/workflow/venv/src/airflow/airflow/jobs.py", line 1581, in process_file
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/opt/GTA/workflow/venv/src/airflow/airflow/jobs.py", line 1152, in _process_dags
    self._process_task_instances(dag, tis_out)
  File "/opt/GTA/workflow/venv/src/airflow/airflow/jobs.py", line 838, in _process_task_instances
    self.logger.info("Examining DAG run {}".format(run))
  File "/opt/GTA/workflow/venv/src/airflow/airflow/models.py", line 3803, in __repr__
    dag_id=self.dag_id,
  File "/opt/GTA/workflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/attributes.py",
line 237, in __get__
    return self.impl.get(instance_state(instance), dict_)
  File "/opt/GTA/workflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/attributes.py",
line 579, in get
    value = state._load_expired(state, passive)
  File "/opt/GTA/workflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/state.py", line
592, in _load_expired
    self.manager.deferred_scalar_loader(self, toload)
  File "/opt/GTA/workflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/loading.py", line
644, in load_scalar_attributes
    (state_str(state)))
sqlalchemy.orm.exc.DetachedInstanceError: Instance <DagRun at 0x7fa12bcb9f98> is not
bound to a Session; attribute refresh operation cannot proceed
{noformat}

PS. This feature sounds great but it seems to cause too much problems right now. Maybe I'm
using it wrong? It would be nice to document it better :)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message