beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guillermo Rodríguez Cano (JIRA) <j...@apache.org>
Subject [jira] [Comment Edited] (BEAM-2490) ReadFromText function is not taking all data with glob operator (*)
Date Sat, 24 Jun 2017 01:10:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16061713#comment-16061713
] 

Guillermo Rodríguez Cano edited comment on BEAM-2490 at 6/24/17 1:09 AM:
-------------------------------------------------------------------------

  Hello again,

 as I commented before, and after fixing the shell's expansion I think I am having a similar
issue, in both Dataflow and Direct runners. I am not sure if it is the glob operator or the
combination with the gzip compression.
I simplified my pipeline to emulate a simple grep of some JSON files this time:

{code:none}
with beam.Pipeline(options=pipeline_options) as p:
    raw_events = p | 'Read input' >> ReadFromText(known_args.input)
        
    events = raw_events | 'Generate events' >> beam.ParDo(ExtractEventsFn())

    filtered_events = (events
                       | 'Filter for a specific user' >> beam.Filter(lambda e: e['user']
== '123')
                       | 'Filter for a specific video' >> beam.Filter(lambda e: e['video']
== '456')
                      )

    output = (filtered_events 
              | 'Format output events' >> beam.Map(lambda e: '%s @ %s (%s - %s - %s)'
% (datetime.fromtimestamp(e['timestamp']/1000).isoformat(), e['type'], e['user'], e['video'],
e['device']))
              | 'Write results' >> WriteToText(known_args.output)
             )
{code}

When I run the pipeline with the input files decompressed with either the Direct or Dataflow
runners I obtain the expected result (as compared with me parsing the input files in the command
line with the grep command) while when I run the pipeline with the files compressed (gzip)
I obtain, with both runners, a rather minimal subset (as in <2%) of the expected result.
When running with Dataflow runner I was using the Google Cloud Storage, while with the Direct
runner I was using my local hard-disk (I had to reduce the initial subset of files from 48
files to just 8 files where I know the result is generated from due to high swap memory used
by the Python process in the laptop for the uncompressed scenario).

The similarities with [~oliviernguyenquoc] are that the compressed files I am using are around
the same size (200 MB, decompressed are about 1.5-2 GB) and text files (JSON in my case).

Interestingly enough I also tried loading each file as a PCollection directly in the source
code and then merge them with a Flatten transform. I got similar unsuccessful results with
the Direct runner (I did not try with the Dataflow runner). Similar because the output was
slightly different than when using the glob operator in the directory where the files are.

It feels as if Apache Beam is sampling the files when they are gzip compressed


was (Author: wileeam):
  Hello again,

 as I commented before, and after fixing the shell's expansion I think I am having a similar
issue, in both Dataflow and Direct runners. I am not sure if it is the glob operator or the
combination with the gzip compression.
I simplified my pipeline to emulate a simple grep of some JSON files this time:

{code:none}
    with beam.Pipeline(options=pipeline_options) as p:
        raw_events = p | 'Read input' >> ReadFromText(known_args.input)
        
        events = raw_events | 'Generate events' >> beam.ParDo(ExtractEventsFn())

        filtered_events = (events
                           | 'Filter for a specific user' >> beam.Filter(lambda e: e['user']
== '123')
                           | 'Filter for a specific video' >> beam.Filter(lambda e:
e['video'] == '456')
                          )

        output = (filtered_events 
                  | 'Format output events' >> beam.Map(lambda e: '%s @ %s (%s - %s -
%s)' % (datetime.fromtimestamp(e['timestamp']/1000).isoformat(), e['type'], e['user'], e['video'],
e['device']))
                  | 'Write results' >> WriteToText(known_args.output)
                 )
{code}

When I run the pipeline with the input files decompressed with either the Direct or Dataflow
runners I obtain the expected result (as compared with me parsing the input files in the command
line with the grep command) while when I run the pipeline with the files compressed (gzip)
I obtain, with both runners, a rather minimal subset (as in <2%) of the expected result.
When running with Dataflow runner I was using the Google Cloud Storage, while with the Direct
runner I was using my local hard-disk (I had to reduce the initial subset of files from 48
files to just 8 files where I know the result is generated from due to high swap memory used
by the Python process in the laptop for the uncompressed scenario).

The similarities with [~oliviernguyenquoc] are that the compressed files I am using are around
the same size (200 MB, decompressed are about 1.5-2 GB) and text files (JSON in my case).

Interestingly enough I also tried loading each file as a PCollection directly in the source
code and then merge them with a Flatten transform. I got similar unsuccessful results with
the Direct runner (I did not try with the Dataflow runner). Similar because the output was
slightly different than when using the glob operator in the directory where the files are.

It feels as if Apache Beam is sampling the files when they are gzip compressed

> ReadFromText function is not taking all data with glob operator (*) 
> --------------------------------------------------------------------
>
>                 Key: BEAM-2490
>                 URL: https://issues.apache.org/jira/browse/BEAM-2490
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py
>    Affects Versions: 2.0.0
>         Environment: Usage with Google Cloud Platform: Dataflow runner
>            Reporter: Olivier NGUYEN QUOC
>            Assignee: Chamikara Jayalath
>             Fix For: Not applicable
>
>
> I run a very simple pipeline:
> * Read my files from Google Cloud Storage
> * Split with '\n' char
> * Write in on a Google Cloud Storage
> I have 8 files that match with the pattern:
> * my_files_2016090116_20160902_060051_xxxxxxxxxx.csv.gz (229.25 MB)
> * my_files_2016090117_20160902_060051_xxxxxxxxxx.csv.gz (184.1 MB)
> * my_files_2016090118_20160902_060051_xxxxxxxxxx.csv.gz (171.73 MB)
> * my_files_2016090119_20160902_060051_xxxxxxxxxx.csv.gz (151.34 MB)
> * my_files_2016090120_20160902_060051_xxxxxxxxxx.csv.gz (129.69 MB)
> * my_files_2016090121_20160902_060051_xxxxxxxxxx.csv.gz (151.7 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (346.46 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (222.57 MB)
> This code should take them all:
> {code:python}
> beam.io.ReadFromText(
>       "gs://XXXX_folder1/my_files_20160901*.csv.gz",
>       skip_header_lines=1,
>       compression_type=beam.io.filesystem.CompressionTypes.GZIP
>       )
> {code}
> It runs well but there is only a 288.62 MB file in output of this pipeline (instead of
a 1.5 GB file).
> The whole pipeline code:
> {code:python}
> data = (p | 'ReadMyFiles' >> beam.io.ReadFromText(
>           "gs://XXXX_folder1/my_files_20160901*.csv.gz",
>           skip_header_lines=1,
>           compression_type=beam.io.filesystem.CompressionTypes.GZIP
>           )
>                        | 'SplitLines' >> beam.FlatMap(lambda x: x.split('\n'))
>                     )
> output = (
>           data| "Write" >> beam.io.WriteToText('gs://XXX_folder2/test.csv', num_shards=1)
>             )
> {code}
> Dataflow indicates me that the estimated size 	of the output after the ReadFromText step
is 602.29 MB only, which not correspond to any unique input file size nor the overall file
size matching with the pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message