beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guillermo Rodríguez Cano (JIRA) <j...@apache.org>
Subject [jira] [Commented] (BEAM-2490) ReadFromText function is not taking all data with glob operator (*)
Date Mon, 26 Jun 2017 15:06:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16063224#comment-16063224
] 

Guillermo Rodríguez Cano commented on BEAM-2490:
------------------------------------------------

 Hello [~chamikara] and [~altay], and thanks for the comments,

 here you have some details of the setup I have used for the Direct runner so far (where apache
beam version also applies for the Dataflow runner):
* OS: Mac OS X Sierra 10.12.5 
* Apache Beam: 2.0.0
* Python: 2.7.13

 I tried the HEAD from the official repository (git hash: [16f87f49f20796e29d01ed363a9097ea5420583c|https://github.com/apache/beam/tree/16f87f49f20796e29d01ed363a9097ea5420583c])
as suggested by [~altay] and I cannot conclude yet whether it works or not. It seems that
gz files are read 'more' than before because there is a higher memory usage than when using
the current release of Apache Beam (and the amount of memory used is comparable to the case
when the same non-compressed files are processed with the pipeline). However, it is extremely
slow (again, with the Direct Runner), slower than using the non-compressed files.
Therefore, as a test of the HEAD I am now running only one of those gzip files now but the
task hasn't completed (maybe then I just discovered some performance bug in that fix, https://github.com/apache/beam/pull/3428,
because it feels very slow...).

I am not sure if this would be faster in GCP but I could try this anyways on Dataflow though
I am not sure if I can have Dataflow run the HEAD of the repository. I tried following the
advice on the official [documentation|https://cloud.google.com/dataflow/pipelines/dependencies-python]
but I don't manage to get the repository properly packed for the workers to pick it up.

> ReadFromText function is not taking all data with glob operator (*) 
> --------------------------------------------------------------------
>
>                 Key: BEAM-2490
>                 URL: https://issues.apache.org/jira/browse/BEAM-2490
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py
>    Affects Versions: 2.0.0
>         Environment: Usage with Google Cloud Platform: Dataflow runner
>            Reporter: Olivier NGUYEN QUOC
>            Assignee: Chamikara Jayalath
>             Fix For: Not applicable
>
>
> I run a very simple pipeline:
> * Read my files from Google Cloud Storage
> * Split with '\n' char
> * Write in on a Google Cloud Storage
> I have 8 files that match with the pattern:
> * my_files_2016090116_20160902_060051_xxxxxxxxxx.csv.gz (229.25 MB)
> * my_files_2016090117_20160902_060051_xxxxxxxxxx.csv.gz (184.1 MB)
> * my_files_2016090118_20160902_060051_xxxxxxxxxx.csv.gz (171.73 MB)
> * my_files_2016090119_20160902_060051_xxxxxxxxxx.csv.gz (151.34 MB)
> * my_files_2016090120_20160902_060051_xxxxxxxxxx.csv.gz (129.69 MB)
> * my_files_2016090121_20160902_060051_xxxxxxxxxx.csv.gz (151.7 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (346.46 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (222.57 MB)
> This code should take them all:
> {code:python}
> beam.io.ReadFromText(
>       "gs://XXXX_folder1/my_files_20160901*.csv.gz",
>       skip_header_lines=1,
>       compression_type=beam.io.filesystem.CompressionTypes.GZIP
>       )
> {code}
> It runs well but there is only a 288.62 MB file in output of this pipeline (instead of
a 1.5 GB file).
> The whole pipeline code:
> {code:python}
> data = (p | 'ReadMyFiles' >> beam.io.ReadFromText(
>           "gs://XXXX_folder1/my_files_20160901*.csv.gz",
>           skip_header_lines=1,
>           compression_type=beam.io.filesystem.CompressionTypes.GZIP
>           )
>                        | 'SplitLines' >> beam.FlatMap(lambda x: x.split('\n'))
>                     )
> output = (
>           data| "Write" >> beam.io.WriteToText('gs://XXX_folder2/test.csv', num_shards=1)
>             )
> {code}
> Dataflow indicates me that the estimated size 	of the output after the ReadFromText step
is 602.29 MB only, which not correspond to any unique input file size nor the overall file
size matching with the pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message