airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tomas G. (Jira)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-2327) Cannot pickle PythonOperator dags using Mesos Executor
Date Fri, 25 Oct 2019 15:56:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16959880#comment-16959880
] 

Tomas G. commented on AIRFLOW-2327:
-----------------------------------

I am getting the same error with CeleryExecutor + Airflow 1.10.5 when using PythonOperator
in a dag and pickling option on airflow scheduler.

 

 

> Cannot pickle PythonOperator dags using Mesos Executor
> ------------------------------------------------------
>
>                 Key: AIRFLOW-2327
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2327
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: contrib
>    Affects Versions: 1.9.0
>         Environment: prod
>            Reporter: niraja b
>            Priority: Major
>
> We are using the MesosExecutor of Airflow 
>  
> BashOperator and SimpleHTTPOperator works for us 
> The Scheduler is started using -p to pickle the DAGS.
>  
>  
> The issue we have is with the following sample Code , we tried adding use_dill without
use_dill with PythonOperator and with PythonVirtualenvOperator.. we couldnt get it sucessfully
working on the agent 
>  
> from __future__ import print_function
> from airflow.models import DAG
> from datetime import timedelta, datetime
> from airflow.operators.python_operator import PythonOperator,PythonVirtualenvOperator
> DAG_ID = "testdag"
> DEFAULT_ARGS = {
>  "start_date": datetime(2018, 4, 16, 1, 50, 16),
>  "schedule_interval": None,
>  "dagrun_timeout": timedelta(minutes=60),
>  "email": ['test@test.com'],
>  "email_on_failure": True,
>  "email_on_retry": False,
>  "retries": 3,
>  "retry_delay": timedelta(seconds=5),
> }
> def _testlambda(**kwargs):
>  print("hello world")
> with DAG(dag_id=DAG_ID, default_args=DEFAULT_ARGS) as dag:
>  (
>  PythonVirtualenvOperator(
>  task_id='python_1',
>  python_callable=_testlambda, 
>  use_dill=True,
>  requirements=['dill']
>  )
>  )
>  
> Error 
>  
> Traceback (most recent call last):
>   File "/usr/bin/airflow", line 27, in <module>
>     args.func(args)
>   File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 358, in run
>     DagPickle).filter(DagPickle.id == args.pickle).first()
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2789, in first
>     ret = list(self[0:1])
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2581, in __getitem__
>     return list(res)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/loading.py", line 137, in
instances
>     util.raise_from_cause(err)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line 203, in
raise_from_cause
>     reraise(type(exception), exception, tb=exc_tb, cause=cause)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/loading.py", line 102, in
instances
>     logging.debug(str(fetch[0]))
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/result.py", line 156, in
__repr__
>     return repr(sql_util._repr_row(self))
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/util.py", line 329, in __repr__
>     ", ".join(trunc(value) for value in self.row),
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/sqltypes.py", line 1588, in
process
>     return loads(value)
>   File "/usr/lib/python2.7/site-packages/dill/dill.py", line 299, in loads
>     return load(file)
>   File "/usr/lib/python2.7/site-packages/dill/dill.py", line 288, in load
>     obj = pik.load()
>   File "/usr/lib64/python2.7/pickle.py", line 858, in load
>     dispatch[key](self)
>   File "/usr/lib64/python2.7/pickle.py", line 1090, in load_global
>     klass = self.find_class(module, name)
>   File "/usr/lib/python2.7/site-packages/dill/dill.py", line 445, in find_class
>     return StockUnpickler.find_class(self, module, name)
>   File "/usr/lib64/python2.7/pickle.py", line 1124, in find_class
>     __import__(module)
> ImportError: No module named unusual_prefix_ac646764c974ff68b827793414d8eabcdca720cf_dmitrydag
> I0416 11:22:34.367975 47476 executor.cpp:938] Command exited with status 1 (pid: 47482)
> I0416 11:22:35.371712 47481 process.cpp:887] Failed to accept socket: future discarded
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message