beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guillermo Rodríguez Cano (JIRA) <j...@apache.org>
Subject [jira] [Comment Edited] (BEAM-2490) ReadFromText function is not taking all data with glob operator (*)
Date Sat, 24 Jun 2017 01:14:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16061713#comment-16061713
] 

Guillermo Rodríguez Cano edited comment on BEAM-2490 at 6/24/17 1:13 AM:
-------------------------------------------------------------------------

  Hello again,

 as I commented before, and after fixing the shell's expansion I think I am having a similar
issue, in both Dataflow and Direct runners. I am not sure if it is the glob operator or the
combination with the gzip compression.
I simplified my pipeline to emulate a simple grep of some JSON files this time:

{code:none}
with beam.Pipeline(options=pipeline_options) as p:
    raw_events = p | 'Read input' >> ReadFromText(known_args.input)
        
    events = raw_events | 'Generate events' >> beam.ParDo(ExtractEventsFn())

    filtered_events = (events
                       | 'Filter for a specific user' >> beam.Filter(lambda e: e['user']
== '123')
                       | 'Filter for a specific video' >> beam.Filter(lambda e: e['video']
== '456')
                      )

    output = (filtered_events 
              | 'Format output events' >> beam.Map(lambda e: '%s @ %s (%s - %s - %s)'
% (datetime.fromtimestamp(e['timestamp']/1000).isoformat(), e['type'], e['user'], e['video'],
e['device']))
              | 'Write results' >> WriteToText(known_args.output)
             )
{code}

When I run the pipeline with the input files decompressed with either the Direct or Dataflow
runners I obtain the expected result (as compared with me parsing the input files in the command
line with the grep command) while when I run the pipeline with the files compressed (gzip)
I obtain, with both runners, a rather minimal subset (as in <2%) of the expected result.
When running the pipeline with the Dataflow runner I was using the Google Cloud Storage, while
when I was running it with the Direct runner the storage was my local hard-disk (I did all
tests with the 8 files, out of 48, where I know the result is generated from due to high swap
memory used by the Python process in the laptop for the uncompressed scenario).

The similarities with [~oliviernguyenquoc] are that the compressed files I am using are around
the same size (200 MB, decompressed are about 1.5-2 GB) and text files (JSON in my case).

Interestingly enough I also tried loading each compressed file as a PCollection directly in
the source code and then merge them with a Flatten transform. I got similar unsuccessful results
with the Direct runner (I did not try with the Dataflow runner). Similar because the output
was slightly different than when using the glob operator in the directory where the files
are.

It feels as if Apache Beam is sampling the files when they are gzip compressed regardless
of the runner used.


was (Author: wileeam):
  Hello again,

 as I commented before, and after fixing the shell's expansion I think I am having a similar
issue, in both Dataflow and Direct runners. I am not sure if it is the glob operator or the
combination with the gzip compression.
I simplified my pipeline to emulate a simple grep of some JSON files this time:

{code:none}
with beam.Pipeline(options=pipeline_options) as p:
    raw_events = p | 'Read input' >> ReadFromText(known_args.input)
        
    events = raw_events | 'Generate events' >> beam.ParDo(ExtractEventsFn())

    filtered_events = (events
                       | 'Filter for a specific user' >> beam.Filter(lambda e: e['user']
== '123')
                       | 'Filter for a specific video' >> beam.Filter(lambda e: e['video']
== '456')
                      )

    output = (filtered_events 
              | 'Format output events' >> beam.Map(lambda e: '%s @ %s (%s - %s - %s)'
% (datetime.fromtimestamp(e['timestamp']/1000).isoformat(), e['type'], e['user'], e['video'],
e['device']))
              | 'Write results' >> WriteToText(known_args.output)
             )
{code}

When I run the pipeline with the input files decompressed with either the Direct or Dataflow
runners I obtain the expected result (as compared with me parsing the input files in the command
line with the grep command) while when I run the pipeline with the files compressed (gzip)
I obtain, with both runners, a rather minimal subset (as in <2%) of the expected result.
When running with Dataflow runner I was using the Google Cloud Storage, while with the Direct
runner I was using my local hard-disk (I had to reduce the initial subset of files from 48
files to just 8 files where I know the result is generated from due to high swap memory used
by the Python process in the laptop for the uncompressed scenario).

The similarities with [~oliviernguyenquoc] are that the compressed files I am using are around
the same size (200 MB, decompressed are about 1.5-2 GB) and text files (JSON in my case).

Interestingly enough I also tried loading each compressed file as a PCollection directly in
the source code and then merge them with a Flatten transform. I got similar unsuccessful results
with the Direct runner (I did not try with the Dataflow runner). Similar because the output
was slightly different than when using the glob operator in the directory where the files
are.

It feels as if Apache Beam is sampling the files when they are gzip compressed regardless
of the runner used.

> ReadFromText function is not taking all data with glob operator (*) 
> --------------------------------------------------------------------
>
>                 Key: BEAM-2490
>                 URL: https://issues.apache.org/jira/browse/BEAM-2490
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py
>    Affects Versions: 2.0.0
>         Environment: Usage with Google Cloud Platform: Dataflow runner
>            Reporter: Olivier NGUYEN QUOC
>            Assignee: Chamikara Jayalath
>             Fix For: Not applicable
>
>
> I run a very simple pipeline:
> * Read my files from Google Cloud Storage
> * Split with '\n' char
> * Write in on a Google Cloud Storage
> I have 8 files that match with the pattern:
> * my_files_2016090116_20160902_060051_xxxxxxxxxx.csv.gz (229.25 MB)
> * my_files_2016090117_20160902_060051_xxxxxxxxxx.csv.gz (184.1 MB)
> * my_files_2016090118_20160902_060051_xxxxxxxxxx.csv.gz (171.73 MB)
> * my_files_2016090119_20160902_060051_xxxxxxxxxx.csv.gz (151.34 MB)
> * my_files_2016090120_20160902_060051_xxxxxxxxxx.csv.gz (129.69 MB)
> * my_files_2016090121_20160902_060051_xxxxxxxxxx.csv.gz (151.7 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (346.46 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (222.57 MB)
> This code should take them all:
> {code:python}
> beam.io.ReadFromText(
>       "gs://XXXX_folder1/my_files_20160901*.csv.gz",
>       skip_header_lines=1,
>       compression_type=beam.io.filesystem.CompressionTypes.GZIP
>       )
> {code}
> It runs well but there is only a 288.62 MB file in output of this pipeline (instead of
a 1.5 GB file).
> The whole pipeline code:
> {code:python}
> data = (p | 'ReadMyFiles' >> beam.io.ReadFromText(
>           "gs://XXXX_folder1/my_files_20160901*.csv.gz",
>           skip_header_lines=1,
>           compression_type=beam.io.filesystem.CompressionTypes.GZIP
>           )
>                        | 'SplitLines' >> beam.FlatMap(lambda x: x.split('\n'))
>                     )
> output = (
>           data| "Write" >> beam.io.WriteToText('gs://XXX_folder2/test.csv', num_shards=1)
>             )
> {code}
> Dataflow indicates me that the estimated size 	of the output after the ReadFromText step
is 602.29 MB only, which not correspond to any unique input file size nor the overall file
size matching with the pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message