beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guillermo Rodríguez Cano (JIRA) <j...@apache.org>
Subject [jira] [Comment Edited] (BEAM-2490) ReadFromText function is not taking all data with glob operator (*)
Date Thu, 22 Jun 2017 23:19:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16060177#comment-16060177
] 

Guillermo Rodríguez Cano edited comment on BEAM-2490 at 6/22/17 11:18 PM:
--------------------------------------------------------------------------

I am having a similar issue if not the same, although I am using the DirectRunner instead
(but I believe that in a previous trial I was also using DataflowRunner and even gzip files).

In the following pipeline (I am trying to work with sessions):
{code:none}
    with beam.Pipeline(options=pipeline_options) as p:
        raw_events = p | 'Read input' >> ReadFromText(known_args.input)

        sessions = (raw_events
            | 'Extract event and timestamp' >> beam.ParDo(ExtractEventAndTimestampDoFn())
            | 'Compute sessions window' >> WindowInto(Sessions(gap_size=known_args.session_idle_gap))
            | 'Group by key' >> beam.GroupByKey()
            )

        output = sessions | 'Format output' >> beam.ParDo(FormatSessionOutputDoFn())
        output | 'Write results' >> WriteToText(known_args.output)

{code}

where the input is a list of uncompressed JSON files in the same directory, I get the same
output whether I use the glob operator (*) for all the files or I set the first file in that
list.

When running the pipeline like this (for files of about 200M size each):
{code:none}
python sessions_manager.py --input ./input/test/* --output ./output/ --runner DirectRunner
{code}

The following shows up as process:
{code:none}
python sessions_manager.py --input ./input/test/xaa.json ./input/test/xab.json ./input/test/xac.json
./input/test/xad.json ./input/test/xae.json ./input/test/xaf.json ./input/test/xag --output
./output/ --runner DirectRunner
{code}

And if I run just this: 
{code:none}
python sessions_manager.py --input ./input/test/xaa.json --output ./output/ --runner DirectRunner
{code}
The output is precisely the same as the one with the glob operator (and quite different if
I merge the files into one and run again the pipeline with the merged files into one).



was (Author: wileeam):
I am having a similar issue if not the same, although I am using the DirectRunner instead
(but I believe that in a previous trial I was also using DataflowRunner and even gzip files).

In the following pipeline (I am trying to work with sessions):

{code:none}
    with beam.Pipeline(options=pipeline_options) as p:
        raw_events = p | 'Read input' >> ReadFromText(known_args.input)

        sessions = (raw_events
            | 'Extract event and timestamp' >> beam.ParDo(ExtractEventAndTimestampDoFn())
            | 'Compute sessions window' >> WindowInto(Sessions(gap_size=known_args.session_idle_gap))
            | 'Group by key' >> beam.GroupByKey()
            )

        output = sessions | 'Format output' >> beam.ParDo(FormatSessionOutputDoFn())
        output | 'Write results' >> WriteToText(known_args.output)

{code}

where the input is a list of uncompressed JSON files in the same directory, I get the same
output whether I use the glob operator (*) for all the files or I set the first file in that
list.

When running the pipeline like this (for files of about 200M size each):
{{python sessions_manager.py --input ./input/test/* --output ./output/ --runner DirectRunner}}

The following shows up as process:
{{python sessions_manager.py --input ./input/test/xaa.json ./input/test/xab.json ./input/test/xac.json
./input/test/xad.json ./input/test/xae.json ./input/test/xaf.json ./input/test/xag --output
./output/ --runner DirectRunner}}

And if I run just this: 
{{python sessions_manager.py --input ./input/test/xaa.json --output ./output/ --runner DirectRunner}}
The output is precisely the same as the one with the glob operator (and quite different if
I merge the files into one and run again the pipeline with the merged files into one).


> ReadFromText function is not taking all data with glob operator (*) 
> --------------------------------------------------------------------
>
>                 Key: BEAM-2490
>                 URL: https://issues.apache.org/jira/browse/BEAM-2490
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py
>    Affects Versions: 2.0.0
>         Environment: Usage with Google Cloud Platform: Dataflow runner
>            Reporter: Olivier NGUYEN QUOC
>            Assignee: Chamikara Jayalath
>             Fix For: 2.1.0
>
>
> I run a very simple pipeline:
> * Read my files from Google Cloud Storage
> * Split with '\n' char
> * Write in on a Google Cloud Storage
> I have 8 files that match with the pattern:
> * my_files_2016090116_20160902_060051_xxxxxxxxxx.csv.gz (229.25 MB)
> * my_files_2016090117_20160902_060051_xxxxxxxxxx.csv.gz (184.1 MB)
> * my_files_2016090118_20160902_060051_xxxxxxxxxx.csv.gz (171.73 MB)
> * my_files_2016090119_20160902_060051_xxxxxxxxxx.csv.gz (151.34 MB)
> * my_files_2016090120_20160902_060051_xxxxxxxxxx.csv.gz (129.69 MB)
> * my_files_2016090121_20160902_060051_xxxxxxxxxx.csv.gz (151.7 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (346.46 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (222.57 MB)
> This code should take them all:
> {code:python}
> beam.io.ReadFromText(
>       "gs://XXXX_folder1/my_files_20160901*.csv.gz",
>       skip_header_lines=1,
>       compression_type=beam.io.filesystem.CompressionTypes.GZIP
>       )
> {code}
> It runs well but there is only a 288.62 MB file in output of this pipeline (instead of
a 1.5 GB file).
> The whole pipeline code:
> {code:python}
> data = (p | 'ReadMyFiles' >> beam.io.ReadFromText(
>           "gs://XXXX_folder1/my_files_20160901*.csv.gz",
>           skip_header_lines=1,
>           compression_type=beam.io.filesystem.CompressionTypes.GZIP
>           )
>                        | 'SplitLines' >> beam.FlatMap(lambda x: x.split('\n'))
>                     )
> output = (
>           data| "Write" >> beam.io.WriteToText('gs://XXX_folder2/test.csv', num_shards=1)
>             )
> {code}
> Dataflow indicates me that the estimated size 	of the output after the ReadFromText step
is 602.29 MB only, which not correspond to any unique input file size nor the overall file
size matching with the pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message