beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dmitry Demeshchuk (JIRA)" <>
Subject [jira] [Created] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
Date Sat, 08 Jul 2017 01:20:02 GMT
Dmitry Demeshchuk created BEAM-2573:

             Summary: Better filesystem discovery mechanism in Python SDK
                 Key: BEAM-2573
             Project: Beam
          Issue Type: Task
          Components: runner-dataflow, sdk-py
            Reporter: Dmitry Demeshchuk
            Assignee: Thomas Groh
            Priority: Minor

It looks like right now custom filesystem classes have to be imported explicitly:

Seems like the current implementation doesn't allow discovering filesystems that come from
side packages, not from apache_beam itself. Even if I put a custom FileSystem-inheriting class
into a package and explicitly import it in the root of that package, it still
doesn't make the class discoverable.

The problems I'm experiencing happen on Dataflow runner, while Direct runner works just fine.
Here's an example of Dataflow output:

  (320418708fe777d7): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/", line 581,
in do_work
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/", line 166, in
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/", line
54, in start
  File "dataflow_worker/", line 138, in dataflow_worker.operations.Operation.output
    def output(self, windowed_value, output_index=0):
  File "dataflow_worker/", line 139, in dataflow_worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "dataflow_worker/", line 72, in dataflow_worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/", line 328, in dataflow_worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "dataflow_worker/", line 329, in dataflow_worker.operations.DoOperation.process
  File "apache_beam/runners/", line 382, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/", line 390, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise new_exn, None, original_traceback
  File "apache_beam/runners/", line 388, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/", line 281, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/", line 307, in apache_beam.runners.common.PerWindowInvoker._invoke_per_window
    windowed_value, self.process_method(*args_for_process))
  File "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/",
line 749, in <lambda>
  File "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/",
line 891, in <lambda>
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/options/", line
109, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/", line 146,
in initialize_write
    tmp_dir = self._create_temp_dir(file_path_prefix)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/", line 151,
in _create_temp_dir
    base_path, last_component = FileSystems.split(file_path_prefix)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/", line 99, in
    filesystem = FileSystems.get_filesystem(path)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/", line 61, in
    raise ValueError('Unable to get the Filesystem for path %s' % path)
ValueError: Unable to get the Filesystem for path s3://my-test-bucket/test_output [while running

I apologize for not providing full context or codebase, because a lot of the code we are running
is internal, and some of it is tightly coupled to our infrastructure. If I run out of experimenting
options, I'll try to narrow my use case down to the simplest case possible (like, override
a gcs filesystem with a different path prefix or something).

I think there are several possibilities here:

1. I'm doing something wrong, and it should be trivial to achieve something like that. This
probably implies figuring out the right approach and writing some guideline for the sources/sinks
page in the docs.

2. The current order of imports is not optimal, and we could possibly import the side packages
before initializing the filesystem classes. I currently possess too little knowledge about
the way things get executed in Dataflow, so it's hard for me to tell how much it's worth diving
that rabbithole.

3. There just needs to be a better way of referring to additional filesystem classes. One
way of doing that is to just specify a class name explicitly inside the ReadFromText and WriteToText
functions (or something along these lines). PipelineOptions seems like an overkill for this,
but may still be an option. Finally, maybe there could be just a function that gets called
in the main script that somehow tells Beam to discover a specific class?

This message was sent by Atlassian JIRA

View raw message