beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From OrielResearch Eila Arich-Landkof <e...@orielresearch.org>
Subject Re: Pipeline is passing on local runner and failing on Dataflow runner - help with error
Date Thu, 21 Jun 2018 19:38:55 GMT
Hi Ahmet,

Thank you. I have attached the requirements.txt that was generated from pip
freeze > requirments.txt on the datalab notebook.
The file does not include apache-beam package, only apache-airflow==1.9.0
Could you please let me know which package includes indexes.base

Best,
Eila


On Thu, Jun 21, 2018 at 1:55 PM, Ahmet Altay <altay@google.com> wrote:

> Hi Ella,
>
> It seems like, the package related to indexes.base is not installed in the
> workers. Could you try one of the methods in "Managing Python Pipeline
> Dependencies" [1], to stage that dependency?
>
> Ahmet
>
> [1] https://beam.apache.org/documentation/sdks/python-
> pipeline-dependencies/
>
> On Thu, Jun 21, 2018 at 9:40 AM, OrielResearch Eila Arich-Landkof <
> eila@orielresearch.org> wrote:
>
>> Hello all,
>>
>> Exploring that issue (Local runner - works great and Dataflow fails),
>> there might be a mismatch between the apache_beam version and the dataflow
>> version
>>
>> Please let me know what your thoughts are. if it is a version issue, what
>> updates should be executed? how do I cover the installation on the datalab
>> VM and the Google Cloud Platform.
>>
>> Running the following command / or a different command on the shell? on
>> datalab?
>>
>> I tried running this on the datalab and it didnt solve the issue (*see
>> below the full logs report*)
>>
>> pip install --upgrade apache_beam google-cloud-dataflow
>>
>> Please advice.
>>
>> Thanks,
>> Eila
>>
>>
>> *All logs:*
>>
>>
>> INFO:root:Staging the SDK tarball from PyPI to gs://archs4/staging/label-archs4-tsv.1529598693.453095/dataflow_python_sdk.tar
>> INFO:root:Executing command: ['/usr/local/envs/py2env/bin/python', '-m', 'pip', 'install',
'--download', '/tmp/tmp5MM5wr', 'google-cloud-dataflow==2.0.0', '--no-binary', ':all:', '--no-deps']
>> INFO:root:file copy from /tmp/tmp5MM5wr/google-cloud-dataflow-2.0.0.tar.gz to gs://archs4/staging/label-archs4-tsv.1529598693.453095/dataflow_python_sdk.tar.
>> INFO:oauth2client.client:Attempting refresh to obtain initial access_token
>> INFO:oauth2client.client:Attempting refresh to obtain initial access_token
>> INFO:root:Create job: <Job
>>  createTime: u'2018-06-21T16:31:51.304121Z'
>>  currentStateTime: u'1970-01-01T00:00:00Z'
>>  id: u'2018-06-21_09_31_50-17545183031487377678'
>>  location: u'us-central1'
>>  name: u'label-archs4-tsv'
>>  projectId: u'orielresearch-188115'
>>  stageStates: []
>>  steps: []
>>  tempFiles: []
>>  type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
>> INFO:root:Created job with id: [2018-06-21_09_31_50-17545183031487377678]
>> INFO:root:To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/orielresearch-188115/dataflow/job/2018-06-21_09_31_50-17545183031487377678
>> INFO:root:Job 2018-06-21_09_31_50-17545183031487377678 is in state JOB_STATE_PENDING
>> INFO:root:2018-06-21T16:31:50.476Z: JOB_MESSAGE_DETAILED: Autoscaling is enabled
for job 2018-06-21_09_31_50-17545183031487377678. The number of workers will be between 1
and 1000.
>> INFO:root:2018-06-21T16:31:50.506Z: JOB_MESSAGE_DETAILED: Autoscaling was automatically
enabled for job 2018-06-21_09_31_50-17545183031487377678.
>> INFO:root:2018-06-21T16:31:53.079Z: JOB_MESSAGE_DETAILED: Checking required Cloud
APIs are enabled.
>> INFO:root:2018-06-21T16:31:53.385Z: JOB_MESSAGE_DETAILED: Checking permissions granted
to controller Service Account.
>> INFO:root:2018-06-21T16:31:54.161Z: JOB_MESSAGE_BASIC: Worker configuration: n1-standard-1
in us-central1-b.
>> INFO:root:2018-06-21T16:31:54.910Z: JOB_MESSAGE_DETAILED: Expanding CoGroupByKey
operations into optimizable parts.
>> INFO:root:2018-06-21T16:31:54.936Z: JOB_MESSAGE_DEBUG: Combiner lifting skipped for
step writing to TSV files/Write/WriteImpl/GroupByKey: GroupByKey not followed by a combiner.
>> INFO:root:2018-06-21T16:31:54.968Z: JOB_MESSAGE_DETAILED: Expanding GroupByKey operations
into optimizable parts.
>> INFO:root:2018-06-21T16:31:54.992Z: JOB_MESSAGE_DETAILED: Lifting ValueCombiningMappingFns
into MergeBucketsMappingFns
>> INFO:root:2018-06-21T16:31:55.056Z: JOB_MESSAGE_DEBUG: Annotating graph with Autotuner
information.
>> INFO:root:2018-06-21T16:31:55.168Z: JOB_MESSAGE_DETAILED: Fusing adjacent ParDo,
Read, Write, and Flatten operations
>> INFO:root:2018-06-21T16:31:55.195Z: JOB_MESSAGE_DETAILED: Fusing consumer create
more columns into Extract the rows from dataframe
>> INFO:root:2018-06-21T16:31:55.221Z: JOB_MESSAGE_DETAILED: Fusing consumer writing
to TSV files/Write/WriteImpl/GroupByKey/Reify into writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn)
>> INFO:root:2018-06-21T16:31:55.244Z: JOB_MESSAGE_DETAILED: Fusing consumer writing
to TSV files/Write/WriteImpl/GroupByKey/Write into writing to TSV files/Write/WriteImpl/GroupByKey/Reify
>> INFO:root:2018-06-21T16:31:55.271Z: JOB_MESSAGE_DETAILED: Fusing consumer writing
to TSV files/Write/WriteImpl/WriteBundles/Do into writing to TSV files/Write/WriteImpl/GroupByKey/GroupByWindow
>> INFO:root:2018-06-21T16:31:55.303Z: JOB_MESSAGE_DETAILED: Fusing consumer writing
to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>) into create more columns
>> INFO:root:2018-06-21T16:31:55.328Z: JOB_MESSAGE_DETAILED: Fusing consumer writing
to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn) into writing to TSV files/Write/WriteImpl/Map(<lambda
at iobase.py:895>)
>> INFO:root:2018-06-21T16:31:55.341Z: JOB_MESSAGE_DETAILED: Fusing consumer writing
to TSV files/Write/WriteImpl/GroupByKey/GroupByWindow into writing to TSV files/Write/WriteImpl/GroupByKey/Read
>> INFO:root:2018-06-21T16:31:55.365Z: JOB_MESSAGE_DETAILED: Fusing consumer writing
to TSV files/Write/WriteImpl/InitializeWrite into writing to TSV files/Write/WriteImpl/DoOnce/Read
>> INFO:root:2018-06-21T16:31:55.396Z: JOB_MESSAGE_DEBUG: Workflow config is missing
a default resource spec.
>> INFO:root:2018-06-21T16:31:55.432Z: JOB_MESSAGE_DEBUG: Adding StepResource setup
and teardown to workflow graph.
>> INFO:root:2018-06-21T16:31:55.461Z: JOB_MESSAGE_DEBUG: Adding workflow start and
stop steps.
>> INFO:root:2018-06-21T16:31:55.486Z: JOB_MESSAGE_DEBUG: Assigning stage ids.
>> INFO:root:2018-06-21T16:31:55.641Z: JOB_MESSAGE_DEBUG: Executing wait step start15
>> INFO:root:Job 2018-06-21_09_31_50-17545183031487377678 is in state JOB_STATE_RUNNING
>> INFO:root:2018-06-21T16:31:55.701Z: JOB_MESSAGE_BASIC: Executing operation writing
to TSV files/Write/WriteImpl/DoOnce/Read+writing to TSV files/Write/WriteImpl/InitializeWrite
>> INFO:root:2018-06-21T16:31:55.727Z: JOB_MESSAGE_BASIC: Executing operation writing
to TSV files/Write/WriteImpl/GroupByKey/Create
>> INFO:root:2018-06-21T16:31:55.739Z: JOB_MESSAGE_DEBUG: Starting worker pool setup.
>> INFO:root:2018-06-21T16:31:55.753Z: JOB_MESSAGE_BASIC: Starting 1 workers in us-central1-b...
>> INFO:root:2018-06-21T16:31:55.839Z: JOB_MESSAGE_DEBUG: Value "writing to TSV files/Write/WriteImpl/GroupByKey/Session"
materialized.
>> INFO:root:2018-06-21T16:31:55.901Z: JOB_MESSAGE_BASIC: Executing operation Extract
the rows from dataframe+create more columns+writing to TSV files/Write/WriteImpl/Map(<lambda
at iobase.py:895>)+writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn)+writing
to TSV files/Write/WriteImpl/GroupByKey/Reify+writing to TSV files/Write/WriteImpl/GroupByKey/Write
>> INFO:root:2018-06-21T16:31:56.332Z: JOB_MESSAGE_BASIC: BigQuery export job "dataflow_job_576766793008965363"
started. You can check its status with the bq tool: "bq show -j --project_id=orielresearch-188115
dataflow_job_576766793008965363".
>> INFO:root:2018-06-21T16:32:03.683Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the
number of workers to 0 based on the rate of progress in the currently running step(s).
>> INFO:root:2018-06-21T16:32:14.181Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the
number of workers to 1 based on the rate of progress in the currently running step(s).
>> INFO:root:2018-06-21T16:32:26.827Z: JOB_MESSAGE_DETAILED: BigQuery export job progress:
"dataflow_job_576766793008965363" observed total of 1 exported files thus far.
>> INFO:root:2018-06-21T16:32:26.850Z: JOB_MESSAGE_BASIC: BigQuery export job finished:
"dataflow_job_576766793008965363"
>> INFO:root:2018-06-21T16:32:33.078Z: JOB_MESSAGE_DETAILED: Workers have started successfully.
>> INFO:root:2018-06-21T16:35:35.511Z: JOB_MESSAGE_ERROR: Traceback (most recent call
last):
>>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line
581, in do_work
>>     work_executor.execute()
>>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line
166, in execute
>>     op.start()
>>   File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10680)
>>     def start(self):
>>   File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10574)
>>     with self.scoped_start_state:
>>   File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:9775)
>>     pickler.loads(self.spec.serialized_fn))
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py",
line 225, in loads
>>     return dill.loads(s)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
>>     return load(file)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
>>     obj = pik.load()
>>   File "/usr/lib/python2.7/pickle.py", line 858, in load
>>     dispatch[key](self)
>>   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
>>     klass = self.find_class(module, name)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
>>     return StockUnpickler.find_class(self, module, name)
>>   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
>>     __import__(module)
>> ImportError: No module named indexes.base
>>
>> INFO:root:2018-06-21T16:35:38.897Z: JOB_MESSAGE_ERROR: Traceback (most recent call
last):
>>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line
581, in do_work
>>     work_executor.execute()
>>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line
166, in execute
>>     op.start()
>>   File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10680)
>>     def start(self):
>>   File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10574)
>>     with self.scoped_start_state:
>>   File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:9775)
>>     pickler.loads(self.spec.serialized_fn))
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py",
line 225, in loads
>>     return dill.loads(s)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
>>     return load(file)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
>>     obj = pik.load()
>>   File "/usr/lib/python2.7/pickle.py", line 858, in load
>>     dispatch[key](self)
>>   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
>>     klass = self.find_class(module, name)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
>>     return StockUnpickler.find_class(self, module, name)
>>   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
>>     __import__(module)
>> ImportError: No module named indexes.base
>>
>> INFO:root:2018-06-21T16:35:42.245Z: JOB_MESSAGE_ERROR: Traceback (most recent call
last):
>>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line
581, in do_work
>>     work_executor.execute()
>>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line
166, in execute
>>     op.start()
>>   File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10680)
>>     def start(self):
>>   File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10574)
>>     with self.scoped_start_state:
>>   File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:9775)
>>     pickler.loads(self.spec.serialized_fn))
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py",
line 225, in loads
>>     return dill.loads(s)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
>>     return load(file)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
>>     obj = pik.load()
>>   File "/usr/lib/python2.7/pickle.py", line 858, in load
>>     dispatch[key](self)
>>   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
>>     klass = self.find_class(module, name)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
>>     return StockUnpickler.find_class(self, module, name)
>>   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
>>     __import__(module)
>> ImportError: No module named indexes.base
>>
>> INFO:root:2018-06-21T16:35:45.619Z: JOB_MESSAGE_ERROR: Traceback (most recent call
last):
>>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line
581, in do_work
>>     work_executor.execute()
>>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line
166, in execute
>>     op.start()
>>   File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10680)
>>     def start(self):
>>   File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10574)
>>     with self.scoped_start_state:
>>   File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:9775)
>>     pickler.loads(self.spec.serialized_fn))
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py",
line 225, in loads
>>     return dill.loads(s)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
>>     return load(file)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
>>     obj = pik.load()
>>   File "/usr/lib/python2.7/pickle.py", line 858, in load
>>     dispatch[key](self)
>>   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
>>     klass = self.find_class(module, name)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
>>     return StockUnpickler.find_class(self, module, name)
>>   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
>>     __import__(module)
>> ImportError: No module named indexes.base
>>
>> INFO:root:2018-06-21T16:35:45.668Z: JOB_MESSAGE_DEBUG: Executing failure step failure14
>> INFO:root:2018-06-21T16:35:45.695Z: JOB_MESSAGE_ERROR: Workflow failed. Causes: S04:Extract
the rows from dataframe+create more columns+writing to TSV files/Write/WriteImpl/Map(<lambda
at iobase.py:895>)+writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn)+writing
to TSV files/Write/WriteImpl/GroupByKey/Reify+writing to TSV files/Write/WriteImpl/GroupByKey/Write
failed., A work item was attempted 4 times without success. Each time the worker eventually
lost contact with the service. The work item was attempted on:
>>   label-archs4-tsv-06210931-a4r1-harness-rlqz,
>>   label-archs4-tsv-06210931-a4r1-harness-rlqz,
>>   label-archs4-tsv-06210931-a4r1-harness-rlqz,
>>   label-archs4-tsv-06210931-a4r1-harness-rlqz
>> INFO:root:2018-06-21T16:35:45.799Z: JOB_MESSAGE_DETAILED: Cleaning up.
>> INFO:root:2018-06-21T16:35:46Z: JOB_MESSAGE_DEBUG: Starting worker pool teardown.
>> INFO:root:2018-06-21T16:35:46.027Z: JOB_MESSAGE_BASIC: Stopping worker pool...
>>
>>
>>
>> On Wed, Jun 20, 2018 at 5:02 PM, OrielResearch Eila Arich-Landkof <
>> eila@orielresearch.org> wrote:
>>
>>> Hello,
>>>
>>> I am running the following pipeline on the local runner with no issues.
>>>
>>> logging.info('Define the pipeline')
>>> p =  beam.Pipeline(options=options)
>>> samplePath = outputPath
>>> ExploreData = (p | "Extract the rows from dataframe" >> beam.io.Read(
>>> beam.io.BigQuerySource('archs4.Debug_annotation'))
>>>                  | "create more columns" >>
>>> beam.ParDo(CreateColForSampleFn(colListSubset,outputPath)))
>>> (ExploreData | 'writing to TSV files' >> beam.io.WriteToText('gs://arch
>>> s4/output/dataExploration.txt',file_name_suffix='.tsv',num_s
>>> hards=1,append_trailing_newlines=True,header=colListStrHeader))
>>>
>>>
>>> Running on Dataflow fires the below error. I don't have any idea where
>>> to look for the issue. The error is not pointing to my pipeline code but to
>>> apache beam modules.
>>> I will try debugging using elimination. Please let me know if you have
>>> any direction for me.
>>>
>>> Many thanks,
>>> Eila
>>>
>>>
>>> ======================================================
>>>
>>> DataflowRuntimeExceptionTraceback (most recent call last)<ipython-input-151-1e5aeb8b7d9b>
in <module>()----> 1 p.run().wait_until_finish()
>>> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.pyc
in wait_until_finish(self, duration)    776         raise DataflowRuntimeException(    777
            'Dataflow pipeline failed. State: %s, Error:\n%s' %--> 778             (self.state,
getattr(self._runner, 'last_error_msg', None)), self)    779     return self.state    780
>>> DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
>>> Traceback (most recent call last):
>>>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
line 581, in do_work
>>>     work_executor.execute()
>>>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
line 166, in execute
>>>     op.start()
>>>   File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10680)
>>>     def start(self):
>>>   File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10574)
>>>     with self.scoped_start_state:
>>>   File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:9775)
>>>     pickler.loads(self.spec.serialized_fn))
>>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py",
line 225, in loads
>>>     return dill.loads(s)
>>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
>>>     return load(file)
>>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
>>>     obj = pik.load()
>>>   File "/usr/lib/python2.7/pickle.py", line 858, in load
>>>     dispatch[key](self)
>>>   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
>>>     klass = self.find_class(module, name)
>>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
>>>     return StockUnpickler.find_class(self, module, name)
>>>   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
>>>     __import__(module)
>>> ImportError: No module named indexes.base
>>>
>>> ======================================================
>>>
>>> --
>>> Eila
>>> www.orielresearch.org
>>> https://www.meetu <https://www.meetup.com/Deep-Learning-In-Production/>
>>> p.co <https://www.meetup.com/Deep-Learning-In-Production/>m/Deep-Le
>>> arning-In-Production/
>>> <https://www.meetup.com/Deep-Learning-In-Production/>
>>>
>>>
>>>
>>
>>
>> --
>> Eila
>> www.orielresearch.org
>> https://www.meetu <https://www.meetup.com/Deep-Learning-In-Production/>
>> p.co <https://www.meetup.com/Deep-Learning-In-Production/>m/Deep-Le
>> arning-In-Production/
>> <https://www.meetup.com/Deep-Learning-In-Production/>
>>
>>
>>
>


-- 
Eila
www.orielresearch.org
https://www.meetu <https://www.meetup.com/Deep-Learning-In-Production/>p.co
<https://www.meetup.com/Deep-Learning-In-Production/>
m/Deep-Learning-In-Production/
<https://www.meetup.com/Deep-Learning-In-Production/>

Mime
View raw message