airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jannik Franz (Jira)" <>
Subject [jira] [Created] (AIRFLOW-5689) Side-Input in Python3 fails to pickle class
Date Fri, 18 Oct 2019 13:36:00 GMT
Jannik Franz created AIRFLOW-5689:

             Summary: Side-Input in Python3 fails to pickle class
                 Key: AIRFLOW-5689
             Project: Apache Airflow
          Issue Type: Bug
          Components: gcp
    Affects Versions: 1.10.5
         Environment: python3,beam 2.16.0
            Reporter: Jannik Franz

When running Apache Beam with Python3 on Google Cloud Dataflow Sideinputs don't work.

When testing it in the local/direct runner there seems to be no issue.


class FlattenCustomActions(beam.PTransform):
    """ Transforms Facebook Day Actions        Only retains actions with custom_conversions
        Flattens the actions
        Adds custom conversions names using a side input
    """    def __init__(self, conversions):
        super(FlattenCustomActions, self).__init__()
        self.conversions = conversions    def expand(self, input_or_inputs):
        return (
            | "FlattenActions" >> beam.ParDo(flatten_filter_actions)
            | "AddConversionName" >> beam.Map(add_conversion_name, self.conversions)

# ...
# in run():
pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=pipeline_options)    conversions_output = (
        | "ReadConversions" >> ReadFromText(known_args.input_conversions, coder=JsonCoder())
        | TransformConversionMetadata()
    )    (
        | "WriteConversions"
        >> WriteCoerced(
    )    (
        | ReadFacebookJson(known_args.input, retain_root_fields=True)
        | FlattenCustomActions(beam.pvalue.AsList(conversions_output))
        | "WriteActions"
        >> WriteCoerced(
            known_args.output, known_args.output_type, schema_path=BIGQUERY_SCHEMA_ACTIONS_PATH

I receive the following Traceback in Dataflow:
Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/dataflow_worker/",
line 773, in run self._load_main_session(self.local_staging_directory) File "/usr/local/lib/python3.6/site-packages/dataflow_worker/",
line 489, in _load_main_session pickler.load_session(session_file) File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/",
line 287, in load_session return dill.load_session(file_path) File "/usr/local/lib/python3.6/site-packages/dill/",
line 410, in load_session module = unpickler.load() File "/usr/local/lib/python3.6/site-packages/dill/",
line 474, in find_class return StockUnpickler.find_class(self, module, name) AttributeError:
Can't get attribute 'FlattenCustomActions' on <module 'dataflow_worker.start' from '/usr/local/lib/python3.6/site-packages/dataflow_worker/'>



This message was sent by Atlassian Jira

View raw message