airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ben Storrie <brstor...@gmail.com>
Subject Custom Operator Issues
Date Mon, 25 Jul 2016 22:06:38 GMT
Hello,

Not sure if this is the correct place to ask, but I couldn't find anywhere
better to ask.  I'm trying to create a custom Spark Operator, that, at the
moment, will basically accomplish the same as a BashOperator, but with some
additional features. Eventually it will not be a duplicate, but I cannot
get it working as is. Should this be done as a plugin, rather than a custom
operator that inherits from BaseOperator?

I've attached the custom spark operator, and the dag file for review, as
they are too large for this. The exception I receive when attempting to run
the dag is the following:

[2016-07-25 21:53:24,302] {__init__.py:36} INFO - Using executor
LocalExecutor
Namespace(dag_id='spark_operator_2', execution_date=datetime.datetime(2016,
7, 25, 0, 0), force=False, func=<function run at 0x7fbb9fbb6e60>,
ignore_dependencies=False, ignore_depends_on_past=False, job_id=None,
local=False, mark_success=False, pickle=None, pool=None, raw=False,
ship_dag=False, subcommand='run', subdir='/opt/spotx-hadoop-airflow/dags',
task_id='run')
Sending to executor.
[2016-07-25 21:53:24,983] {__init__.py:36} INFO - Using executor
LocalExecutor
Namespace(dag_id='spark_operator_2', execution_date=datetime.datetime(2016,
7, 25, 0, 0), force=False, func=<function run at 0x7fbd7b2f7e60>,
ignore_dependencies=False, ignore_depends_on_past=False, job_id=None,
local=True, mark_success=False, pickle=None, pool=None, raw=False,
ship_dag=False, subcommand='run', subdir='DAGS_FOLDER/spark_test.py',
task_id='run')
Traceback (most recent call last):
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/bin/airflow",
line 16, in <module>
    args.func(args)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
line 206, in run
    dag = get_dag(args)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
line 73, in get_dag
    dagbag = DagBag(process_subdir(args.subdir))
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 166, in __init__
    self.collect_dags(dag_folder)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 385, in collect_dags
    self.process_file(dag_folder, only_if_updated=only_if_updated)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 292, in process_file
    self.bag_dag(dag, parent_dag=dag, root_dag=dag)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 341, in bag_dag
    dag.resolve_template_files()
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 2715, in resolve_template_files
    t.resolve_template_files()
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 2033, in resolve_template_files
    content = getattr(self, attr)
AttributeError: 'SparkOperator' object has no attribute 's'

Many thanks,
Ben

Mime
View raw message