airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maxime Beauchemin <maximebeauche...@gmail.com>
Subject Re: Custom Operator Issues
Date Tue, 26 Jul 2016 07:00:45 GMT
`template_fields` should be a proper iterable (list or tuple). Change
`template_fields = 'spark_jar'` to `template_fields = ('spark_jar',)`

Max

On Mon, Jul 25, 2016 at 3:06 PM, Ben Storrie <brstorrie@gmail.com> wrote:

> Hello,
>
> Not sure if this is the correct place to ask, but I couldn't find anywhere
> better to ask.  I'm trying to create a custom Spark Operator, that, at the
> moment, will basically accomplish the same as a BashOperator, but with some
> additional features. Eventually it will not be a duplicate, but I cannot
> get it working as is. Should this be done as a plugin, rather than a custom
> operator that inherits from BaseOperator?
>
> I've attached the custom spark operator, and the dag file for review, as
> they are too large for this. The exception I receive when attempting to run
> the dag is the following:
>
> [2016-07-25 21:53:24,302] {__init__.py:36} INFO - Using executor
> LocalExecutor
> Namespace(dag_id='spark_operator_2',
> execution_date=datetime.datetime(2016, 7, 25, 0, 0), force=False,
> func=<function run at 0x7fbb9fbb6e60>, ignore_dependencies=False,
> ignore_depends_on_past=False, job_id=None, local=False, mark_success=False,
> pickle=None, pool=None, raw=False, ship_dag=False, subcommand='run',
> subdir='/opt/spotx-hadoop-airflow/dags', task_id='run')
> Sending to executor.
> [2016-07-25 21:53:24,983] {__init__.py:36} INFO - Using executor
> LocalExecutor
> Namespace(dag_id='spark_operator_2',
> execution_date=datetime.datetime(2016, 7, 25, 0, 0), force=False,
> func=<function run at 0x7fbd7b2f7e60>, ignore_dependencies=False,
> ignore_depends_on_past=False, job_id=None, local=True, mark_success=False,
> pickle=None, pool=None, raw=False, ship_dag=False, subcommand='run',
> subdir='DAGS_FOLDER/spark_test.py', task_id='run')
> Traceback (most recent call last):
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/bin/airflow",
> line 16, in <module>
>     args.func(args)
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
> line 206, in run
>     dag = get_dag(args)
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
> line 73, in get_dag
>     dagbag = DagBag(process_subdir(args.subdir))
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 166, in __init__
>     self.collect_dags(dag_folder)
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 385, in collect_dags
>     self.process_file(dag_folder, only_if_updated=only_if_updated)
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 292, in process_file
>     self.bag_dag(dag, parent_dag=dag, root_dag=dag)
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 341, in bag_dag
>     dag.resolve_template_files()
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 2715, in resolve_template_files
>     t.resolve_template_files()
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 2033, in resolve_template_files
>     content = getattr(self, attr)
> AttributeError: 'SparkOperator' object has no attribute 's'
>
> Many thanks,
> Ben
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message