beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sourabh Bajaj (JIRA)" <>
Subject [jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
Date Mon, 10 Jul 2017 19:53:00 GMT


Sourabh Bajaj commented on BEAM-2573:

I think the plugins are working correctly if they are passing a list of class names to be
imported at the start. You might need to wait for the next release as this required a change
to the dataflow workers as they need to start importing the paths specified in the beam-plugins
list. There is a release going on right now so that might happen in the next few days itself.

I am not sure about the crash loop in windmillio  [~charleschen] might know more.

> Better filesystem discovery mechanism in Python SDK
> ---------------------------------------------------
>                 Key: BEAM-2573
>                 URL:
>             Project: Beam
>          Issue Type: Task
>          Components: runner-dataflow, sdk-py
>    Affects Versions: 2.0.0
>            Reporter: Dmitry Demeshchuk
>            Priority: Minor
> It looks like right now custom filesystem classes have to be imported explicitly:
> Seems like the current implementation doesn't allow discovering filesystems that come
from side packages, not from apache_beam itself. Even if I put a custom FileSystem-inheriting
class into a package and explicitly import it in the root of that package, it
still doesn't make the class discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner works just
fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/", line
581, in do_work
>     work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/", line 166,
in execute
>     op.start()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/",
line 54, in start
>     self.output(windowed_value)
>   File "dataflow_worker/", line 138, in dataflow_worker.operations.Operation.output
>     def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/", line 139, in dataflow_worker.operations.Operation.output
>     cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/", line 72, in dataflow_worker.operations.ConsumerSet.receive
>     cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/", line 328, in dataflow_worker.operations.DoOperation.process
>     with self.scoped_process_state:
>   File "dataflow_worker/", line 329, in dataflow_worker.operations.DoOperation.process
>     self.dofn_receiver.receive(o)
>   File "apache_beam/runners/", line 382, in apache_beam.runners.common.DoFnRunner.receive
>     self.process(windowed_value)
>   File "apache_beam/runners/", line 390, in apache_beam.runners.common.DoFnRunner.process
>     self._reraise_augmented(exn)
>   File "apache_beam/runners/", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>     raise new_exn, None, original_traceback
>   File "apache_beam/runners/", line 388, in apache_beam.runners.common.DoFnRunner.process
>     self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/", line 281, in apache_beam.runners.common.PerWindowInvoker.invoke_process
>     self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/", line 307, in apache_beam.runners.common.PerWindowInvoker._invoke_per_window
>     windowed_value, self.process_method(*args_for_process))
>   File "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/",
line 749, in <lambda>
>   File "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/",
line 891, in <lambda>
>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/options/",
line 109, in _f
>     return fnc(self, *args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/", line
146, in initialize_write
>     tmp_dir = self._create_temp_dir(file_path_prefix)
>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/", line
151, in _create_temp_dir
>     base_path, last_component = FileSystems.split(file_path_prefix)
>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/", line 99,
in split
>     filesystem = FileSystems.get_filesystem(path)
>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/", line 61,
in get_filesystem
>     raise ValueError('Unable to get the Filesystem for path %s' % path)
> ValueError: Unable to get the Filesystem for path s3://my-test-bucket/test_output [while
running 'write/Write/WriteImpl/InitializeWrite']
> {code}
> I apologize for not providing full context or codebase, because a lot of the code we
are running is internal, and some of it is tightly coupled to our infrastructure. If I run
out of experimenting options, I'll try to narrow my use case down to the simplest case possible
(like, override a gcs filesystem with a different path prefix or something).
> I think there are several possibilities here:
> 1. I'm doing something wrong, and it should be trivial to achieve something like that.
This probably implies figuring out the right approach and writing some guideline for the sources/sinks
page in the docs.
> 2. The current order of imports is not optimal, and we could possibly import the side
packages before initializing the filesystem classes. I currently possess too little knowledge
about the way things get executed in Dataflow, so it's hard for me to tell how much it's worth
diving that rabbithole.
> 3. There just needs to be a better way of referring to additional filesystem classes.
One way of doing that is to just specify a class name explicitly inside the ReadFromText and
WriteToText functions (or something along these lines). PipelineOptions seems like an overkill
for this, but may still be an option. Finally, maybe there could be just a function that gets
called in the main script that somehow tells Beam to discover a specific class?

This message was sent by Atlassian JIRA

View raw message