Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D4C49200CBE for ; Fri, 23 Jun 2017 01:22:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D2ED0160BE7; Thu, 22 Jun 2017 23:22:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ED30C160BF1 for ; Fri, 23 Jun 2017 01:22:03 +0200 (CEST) Received: (qmail 48295 invoked by uid 500); 22 Jun 2017 23:22:03 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 48286 invoked by uid 99); 22 Jun 2017 23:22:03 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jun 2017 23:22:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 906E8185F45 for ; Thu, 22 Jun 2017 23:22:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.201 X-Spam-Level: X-Spam-Status: No, score=-99.201 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id waOs0xQ9DpsW for ; Thu, 22 Jun 2017 23:22:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id D5C605F3A1 for ; Thu, 22 Jun 2017 23:22:00 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 63E34E0237 for ; Thu, 22 Jun 2017 23:22:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 1B03021941 for ; Thu, 22 Jun 2017 23:22:00 +0000 (UTC) Date: Thu, 22 Jun 2017 23:22:00 +0000 (UTC) From: =?utf-8?Q?Guillermo_Rodr=C3=ADguez_Cano_=28JIRA=29?= To: commits@beam.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (BEAM-2490) ReadFromText function is not taking all data with glob operator (*) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 22 Jun 2017 23:22:05 -0000 [ https://issues.apache.org/jira/browse/BEAM-2490?page=3Dcom.atlassian.= jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D16060= 177#comment-16060177 ]=20 Guillermo Rodr=C3=ADguez Cano edited comment on BEAM-2490 at 6/22/17 11:21 = PM: -------------------------------------------------------------------------- I am having a similar issue if not the same, although I am using the Direct= Runner instead (but I believe that in a previous trial I was also using Dat= aflowRunner and even gzip files). In the following pipeline (I am trying to work with sessions): {code:none} with beam.Pipeline(options=3Dpipeline_options) as p: raw_events =3D p | 'Read input' >> ReadFromText(known_args.input) sessions =3D (raw_events | 'Extract event and timestamp' >> beam.ParDo(ExtractEventAndTi= mestampDoFn()) | 'Compute sessions window' >> WindowInto(Sessions(gap_size=3Dk= nown_args.session_idle_gap)) | 'Group by key' >> beam.GroupByKey() ) output =3D sessions | 'Format output' >> beam.ParDo(FormatSessionOu= tputDoFn()) output | 'Write results' >> WriteToText(known_args.output) {code} where the input is a list of uncompressed JSON files in the same directory,= I get the same output whether I use the glob operator * for all the files = or I set the first file in that list. When running the pipeline like this (for files of about 200M size each): {code:none} python sessions_manager.py --input ./input/test/* --output ./output/ --runn= er DirectRunner {code} The following shows up as process: {code:none} python sessions_manager.py --input ./input/test/xaa.json ./input/test/xab.j= son ./input/test/xac.json ./input/test/xad.json ./input/test/xae.json ./inp= ut/test/xaf.json ./input/test/xag.json --output ./output/ --runner DirectRu= nner {code} And if I run just this:=20 {code:none} python sessions_manager.py --input ./input/test/xaa.json --output ./output/= --runner DirectRunner {code} The output is precisely the same as the one with the glob operator (and qui= te different if I merge the files into one and run again the pipeline with = the merged files into one). was (Author: wileeam): I am having a similar issue if not the same, although I am using the Direct= Runner instead (but I believe that in a previous trial I was also using Dat= aflowRunner and even gzip files). In the following pipeline (I am trying to work with sessions): {code:none} with beam.Pipeline(options=3Dpipeline_options) as p: raw_events =3D p | 'Read input' >> ReadFromText(known_args.input) sessions =3D (raw_events | 'Extract event and timestamp' >> beam.ParDo(ExtractEventAndTi= mestampDoFn()) | 'Compute sessions window' >> WindowInto(Sessions(gap_size=3Dk= nown_args.session_idle_gap)) | 'Group by key' >> beam.GroupByKey() ) output =3D sessions | 'Format output' >> beam.ParDo(FormatSessionOu= tputDoFn()) output | 'Write results' >> WriteToText(known_args.output) {code} where the input is a list of uncompressed JSON files in the same directory,= I get the same output whether I use the glob operator (*) for all the file= s or I set the first file in that list. When running the pipeline like this (for files of about 200M size each): {code:none} python sessions_manager.py --input ./input/test/* --output ./output/ --runn= er DirectRunner {code} The following shows up as process: {code:none} python sessions_manager.py --input ./input/test/xaa.json ./input/test/xab.j= son ./input/test/xac.json ./input/test/xad.json ./input/test/xae.json ./inp= ut/test/xaf.json ./input/test/xag.json --output ./output/ --runner DirectRu= nner {code} And if I run just this:=20 {code:none} python sessions_manager.py --input ./input/test/xaa.json --output ./output/= --runner DirectRunner {code} The output is precisely the same as the one with the glob operator (and qui= te different if I merge the files into one and run again the pipeline with = the merged files into one). > ReadFromText function is not taking all data with glob operator (*)=20 > -------------------------------------------------------------------- > > Key: BEAM-2490 > URL: https://issues.apache.org/jira/browse/BEAM-2490 > Project: Beam > Issue Type: Bug > Components: sdk-py > Affects Versions: 2.0.0 > Environment: Usage with Google Cloud Platform: Dataflow runner > Reporter: Olivier NGUYEN QUOC > Assignee: Chamikara Jayalath > Fix For: 2.1.0 > > > I run a very simple pipeline: > * Read my files from Google Cloud Storage > * Split with '\n' char > * Write in on a Google Cloud Storage > I have 8 files that match with the pattern: > * my_files_2016090116_20160902_060051_xxxxxxxxxx.csv.gz (229.25 MB) > * my_files_2016090117_20160902_060051_xxxxxxxxxx.csv.gz (184.1 MB) > * my_files_2016090118_20160902_060051_xxxxxxxxxx.csv.gz (171.73 MB) > * my_files_2016090119_20160902_060051_xxxxxxxxxx.csv.gz (151.34 MB) > * my_files_2016090120_20160902_060051_xxxxxxxxxx.csv.gz (129.69 MB) > * my_files_2016090121_20160902_060051_xxxxxxxxxx.csv.gz (151.7 MB) > * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (346.46 MB) > * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (222.57 MB) > This code should take them all: > {code:python} > beam.io.ReadFromText( > "gs://XXXX_folder1/my_files_20160901*.csv.gz", > skip_header_lines=3D1, > compression_type=3Dbeam.io.filesystem.CompressionTypes.GZIP > ) > {code} > It runs well but there is only a 288.62 MB file in output of this pipelin= e (instead of a 1.5 GB file). > The whole pipeline code: > {code:python} > data =3D (p | 'ReadMyFiles' >> beam.io.ReadFromText( > "gs://XXXX_folder1/my_files_20160901*.csv.gz", > skip_header_lines=3D1, > compression_type=3Dbeam.io.filesystem.CompressionTypes.GZIP > ) > | 'SplitLines' >> beam.FlatMap(lambda x: x.split('= \n')) > ) > output =3D ( > data| "Write" >> beam.io.WriteToText('gs://XXX_folder2/test.csv= ', num_shards=3D1) > ) > {code} > Dataflow indicates me that the estimated size =09of the output after the = ReadFromText step is 602.29 MB only, which not correspond to any unique inp= ut file size nor the overall file size matching with the pattern. -- This message was sent by Atlassian JIRA (v6.4.14#64029)