Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6D8E9200CD3 for ; Fri, 23 Jun 2017 01:18:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6BD55160BF1; Thu, 22 Jun 2017 23:18:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 86FA8160BE7 for ; Fri, 23 Jun 2017 01:18:03 +0200 (CEST) Received: (qmail 35096 invoked by uid 500); 22 Jun 2017 23:18:02 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 35087 invoked by uid 99); 22 Jun 2017 23:18:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jun 2017 23:18:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 25AF3C0DE0 for ; Thu, 22 Jun 2017 23:18:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.21 X-Spam-Level: X-Spam-Status: No, score=-99.21 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id uTc1uqy1lIVS for ; Thu, 22 Jun 2017 23:18:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id D74415F3A1 for ; Thu, 22 Jun 2017 23:18:00 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 5F115E00A3 for ; Thu, 22 Jun 2017 23:18:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 13F8D2193E for ; Thu, 22 Jun 2017 23:18:00 +0000 (UTC) Date: Thu, 22 Jun 2017 23:18:00 +0000 (UTC) From: =?utf-8?Q?Guillermo_Rodr=C3=ADguez_Cano_=28JIRA=29?= To: commits@beam.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (BEAM-2490) ReadFromText function is not taking all data with glob operator (*) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 22 Jun 2017 23:18:04 -0000 [ https://issues.apache.org/jira/browse/BEAM-2490?page=3Dcom.atlassian.= jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D16060= 177#comment-16060177 ]=20 Guillermo Rodr=C3=ADguez Cano edited comment on BEAM-2490 at 6/22/17 11:17 = PM: -------------------------------------------------------------------------- I am having a similar issue if not the same, although I am using the Direct= Runner instead (but I believe that in a previous trial I was also using Dat= aflowRunner and even gzip files). In the following pipeline (I am trying to work with sessions): {{ with beam.Pipeline(options=3Dpipeline_options) as p: raw_events =3D p | 'Read input' >> ReadFromText(known_args.input) sessions =3D (raw_events | 'Extract event and timestamp' >> beam.ParDo(ExtractEventAndTi= mestampDoFn()) | 'Compute sessions window' >> WindowInto(Sessions(gap_size=3Dk= nown_args.session_idle_gap)) | 'Group by key' >> beam.GroupByKey() ) output =3D sessions | 'Format output' >> beam.ParDo(FormatSessionOu= tputDoFn()) output | 'Write results' >> WriteToText(known_args.output) }} where the input is a list of uncompressed JSON files in the same directory,= I get the same output whether I use the glob operator (*) for all the file= s or I set the first file in that list. When running the pipeline like this (for files of about 200M size each): {{python sessions_manager.py --input ./input/test/* --output ./output/ --ru= nner DirectRunner}} The following shows up as process: {{python sessions_manager.py --input ./input/test/xaa.json ./input/test/xab= .json ./input/test/xac.json ./input/test/xad.json ./input/test/xae.json ./i= nput/test/xaf.json ./input/test/xag --output ./output/ --runner DirectRunne= r}} And if I run just this:=20 {{python sessions_manager.py --input ./input/test/xaa.json --output ./outpu= t/ --runner DirectRunner}} The output is precisely the same as the one with the glob operator (and qui= te different if I merge the files into one and run again the pipeline with = the merged files into one). was (Author: wileeam): I am having a similar issue if not the same, although I am using the Direct= Runner instead (but I believe that in a previous trial I was also using Dat= aflowRunner and even gzip files). In the following pipeline (I am trying to work with sessions): {{ with beam.Pipeline(options=3Dpipeline_options) as p: raw_events =3D p | 'Read input' >> ReadFromText(known_args.input) sessions =3D (raw_events | 'Extract event and timestamp' >> beam.ParDo(ExtractEventAndTi= mestampDoFn()) | 'Compute sessions window' >> WindowInto(Sessions(gap_size=3Dk= nown_args.session_idle_gap)) | 'Group by key' >> beam.GroupByKey() ) output =3D sessions | 'Format output' >> beam.ParDo(FormatSessionOu= tputDoFn()) output | 'Write results' >> WriteToText(known_args.output)}} where the input is a list of uncompressed JSON files in the same directory,= I get the same output whether I use the glob operator (*) for all the file= s or I set the first file in that list. When running the pipeline like this (for files of about 200M size each): {{python sessions_manager.py --input ./input/test/* --output ./output/ --ru= nner DirectRunner}} The following shows up as process: {{python sessions_manager.py --input ./input/test/xaa.json ./input/test/xab= .json ./input/test/xac.json ./input/test/xad.json ./input/test/xae.json ./i= nput/test/xaf.json ./input/test/xag --output ./output/ --runner DirectRunne= r}} And if I run just this:=20 {{python sessions_manager.py --input ./input/test/xaa.json --output ./outpu= t/ --runner DirectRunner}} The output is precisely the same as the one with the glob operator (and qui= te different if I merge the files into one and run again the pipeline with = the merged files into one). > ReadFromText function is not taking all data with glob operator (*)=20 > -------------------------------------------------------------------- > > Key: BEAM-2490 > URL: https://issues.apache.org/jira/browse/BEAM-2490 > Project: Beam > Issue Type: Bug > Components: sdk-py > Affects Versions: 2.0.0 > Environment: Usage with Google Cloud Platform: Dataflow runner > Reporter: Olivier NGUYEN QUOC > Assignee: Chamikara Jayalath > Fix For: 2.1.0 > > > I run a very simple pipeline: > * Read my files from Google Cloud Storage > * Split with '\n' char > * Write in on a Google Cloud Storage > I have 8 files that match with the pattern: > * my_files_2016090116_20160902_060051_xxxxxxxxxx.csv.gz (229.25 MB) > * my_files_2016090117_20160902_060051_xxxxxxxxxx.csv.gz (184.1 MB) > * my_files_2016090118_20160902_060051_xxxxxxxxxx.csv.gz (171.73 MB) > * my_files_2016090119_20160902_060051_xxxxxxxxxx.csv.gz (151.34 MB) > * my_files_2016090120_20160902_060051_xxxxxxxxxx.csv.gz (129.69 MB) > * my_files_2016090121_20160902_060051_xxxxxxxxxx.csv.gz (151.7 MB) > * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (346.46 MB) > * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (222.57 MB) > This code should take them all: > {code:python} > beam.io.ReadFromText( > "gs://XXXX_folder1/my_files_20160901*.csv.gz", > skip_header_lines=3D1, > compression_type=3Dbeam.io.filesystem.CompressionTypes.GZIP > ) > {code} > It runs well but there is only a 288.62 MB file in output of this pipelin= e (instead of a 1.5 GB file). > The whole pipeline code: > {code:python} > data =3D (p | 'ReadMyFiles' >> beam.io.ReadFromText( > "gs://XXXX_folder1/my_files_20160901*.csv.gz", > skip_header_lines=3D1, > compression_type=3Dbeam.io.filesystem.CompressionTypes.GZIP > ) > | 'SplitLines' >> beam.FlatMap(lambda x: x.split('= \n')) > ) > output =3D ( > data| "Write" >> beam.io.WriteToText('gs://XXX_folder2/test.csv= ', num_shards=3D1) > ) > {code} > Dataflow indicates me that the estimated size =09of the output after the = ReadFromText step is 602.29 MB only, which not correspond to any unique inp= ut file size nor the overall file size matching with the pattern. -- This message was sent by Atlassian JIRA (v6.4.14#64029)