airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Steen Manniche (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AIRFLOW-351) Failed to clear downstream tasks
Date Wed, 25 Oct 2017 10:59:02 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16218399#comment-16218399
] 

Steen Manniche edited comment on AIRFLOW-351 at 10/25/17 10:58 AM:
-------------------------------------------------------------------

The question has also popped up on [stackoverflow|https://stackoverflow.com/questions/29267261/object-new-thread-lock-is-not-safe-use-thread-lock-new]
in relation to the use of {{deep_copy}}. In general, if a user has any import that actively
uses threads, she will see this error message which, as demonstrated in the present issue,
takes some time and effort to track down. A simple {{try...except}} wrapping of [this line|https://github.com/apache/incubator-airflow/blob/cfc2f73c445074e1e09d6ef6a056cd2b33a945da/airflow/models.py#L3477]
could help debugging immensely:

{code}
diff --git a/airflow/models.py b/airflow/models.py
index e3c52b5..53dfbb2 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3474,7 +3474,10 @@ class DAG(BaseDag, LoggingMixin):
         memo[id(self)] = result
         for k, v in list(self.__dict__.items()):
             if k not in ('user_defined_macros', 'user_defined_filters', 'params'):
-                setattr(result, k, copy.deepcopy(v, memo))
+                try:
+                    setattr(result, k, copy.deepcopy(v, memo))
+                except TypeError as te:
+                    raise AirflowException('Failed to deep_copy the value %s: %s'%(v, te))
 
         result.user_defined_macros = self.user_defined_macros
         result.user_defined_filters = self.user_defined_filters
{code}


was (Author: steen.manniche):
The question has also popped up on [stackoverflow|https://stackoverflow.com/questions/29267261/object-new-thread-lock-is-not-safe-use-thread-lock-new]
in relation to the use of {{deep_copy}}. In general, if a user has any import that actively
uses threads, she will see this error message which, as demonstrated in the present issue,
takes some time and effort to track down. A simple {{try...except}} wrapping of [this line|https://github.com/apache/incubator-airflow/blob/cfc2f73c445074e1e09d6ef6a056cd2b33a945da/airflow/models.py#L3477]
could help debugging immensely:

{{{
diff --git a/airflow/models.py b/airflow/models.py
index e3c52b5..53dfbb2 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3474,7 +3474,10 @@ class DAG(BaseDag, LoggingMixin):
         memo[id(self)] = result
         for k, v in list(self.__dict__.items()):
             if k not in ('user_defined_macros', 'user_defined_filters', 'params'):
-                setattr(result, k, copy.deepcopy(v, memo))
+                try:
+                    setattr(result, k, copy.deepcopy(v, memo))
+                except TypeError as te:
+                    raise AirflowException('Failed to deep_copy the value %s: %s'%(v, te))
 
         result.user_defined_macros = self.user_defined_macros
         result.user_defined_filters = self.user_defined_filters
}}}

> Failed to clear downstream tasks
> --------------------------------
>
>                 Key: AIRFLOW-351
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-351
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: models, subdag, webserver
>    Affects Versions: Airflow 1.7.1.3
>            Reporter: Adinata
>         Attachments: dag_error.py, error_on_clear_dag.txt, ubuntu-14-packages.log, ubuntu-16-oops.log,
ubuntu-16-packages.log
>
>
> {code}
>                           ____/ (  (    )   )  \___
>                          /( (  (  )   _    ))  )   )\
>                        ((     (   )(    )  )   (   )  )
>                      ((/  ( _(   )   (   _) ) (  () )  )
>                     ( (  ( (_)   ((    (   )  .((_ ) .  )_
>                    ( (  )    (      (  )    )   ) . ) (   )
>                   (  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
>                   ( (  (   ) (  )   (  ))     ) _)(   )  )  )
>                  ( (  ( \ ) (    (_  ( ) ( )  )   ) )  )) ( )
>                   (  (   (  (   (_ ( ) ( _    )  ) (  )  )   )
>                  ( (  ( (  (  )     (_  )  ) )  _)   ) _( ( )
>                   ((  (   )(    (     _    )   _) _(_ (  (_ )
>                    (_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
>                    ((__)        \\||lll|l||///          \_))
>                             (   /(/ (  )  ) )\   )
>                           (    ( ( ( | | ) ) )\   )
>                            (   /(| / ( )) ) ) )) )
>                          (     ( ((((_(|)_)))))     )
>                           (      ||\(|(|)|/||     )
>                         (        |(||(||)||||        )
>                           (     //|/l|||)|\\ \     )
>                         (/ / //  /|//||||\\  \ \  \ _)
> -------------------------------------------------------------------------------
> Node: 9889a7c79e9b
> -------------------------------------------------------------------------------
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1817, in wsgi_app
>     response = self.full_dispatch_request()
>   File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1477, in full_dispatch_request
>     rv = self.handle_user_exception(e)
>   File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1381, in handle_user_exception
>     reraise(exc_type, exc_value, tb)
>   File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1475, in full_dispatch_request
>     rv = self.dispatch_request()
>   File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1461, in dispatch_request
>     return self.view_functions[rule.endpoint](**req.view_args)
>   File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 68, in inner
>     return self._run_view(f, *args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 367, in _run_view
>     return fn(self, *args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/flask_login.py", line 755, in decorated_view
>     return func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line 118, in wrapper
>     return f(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line 167, in wrapper
>     return f(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/www/views.py", line 1017, in clear
>     include_upstream=upstream)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2870, in sub_dag
>     dag = copy.deepcopy(self)
>   File "/usr/lib/python2.7/copy.py", line 174, in deepcopy
>     y = copier(memo)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2856, in __deepcopy__
>     setattr(result, k, copy.deepcopy(v, memo))
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 174, in deepcopy
>     y = copier(memo)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1974, in __deepcopy__
>     setattr(result, k, copy.deepcopy(v, memo))
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 230, in _deepcopy_list
>     y.append(deepcopy(a, memo))
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 329, in _reconstruct
>     y = callable(*args)
>   File "/usr/lib/python2.7/copy_reg.py", line 93, in __newobj__
>     return cls.__new__(cls, *args)
> TypeError: object.__new__(thread.lock) is not safe, use thread.lock.__new__()
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message