Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F055D18D09 for ; Wed, 18 Nov 2015 16:52:36 +0000 (UTC) Received: (qmail 60515 invoked by uid 500); 18 Nov 2015 16:52:36 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 60431 invoked by uid 500); 18 Nov 2015 16:52:36 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 60421 invoked by uid 99); 18 Nov 2015 16:52:36 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Nov 2015 16:52:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 46C251A0ADA for ; Wed, 18 Nov 2015 16:52:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.988 X-Spam-Level: ** X-Spam-Status: No, score=2.988 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.008, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id eFEMUtqvQiDj for ; Wed, 18 Nov 2015 16:52:29 +0000 (UTC) Received: from mail-qk0-f174.google.com (mail-qk0-f174.google.com [209.85.220.174]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 4079C2026D for ; Wed, 18 Nov 2015 16:52:29 +0000 (UTC) Received: by qkas77 with SMTP id s77so16103074qka.0 for ; Wed, 18 Nov 2015 08:52:28 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=7yxsV9XSit8G8DLpoq/QUdSpv1AKdDrDIfP4mYKwt4s=; b=kTfJNnJpK5ikCPi8BSy7C1h1QAMnaSBTB1vvZEARKUBzR/PGvSbul8pGY8oW0lsp05 kdTjwir3cdLeqg2yTe3uX6P96yEHxsQ3TXLkxOzbdbpfWnG+FGMWIXqbTzv2E+041jfb KQDCdVlIlo1sGQLEjuT9zSwgpHcdcoHDlqBpSNTwpEBB6Oh+Cp3ksXh7D5CtB3Kon6qr u7FMCSnxkAnOr5K7quj6jtzq9MJcZ8DShdbpuyQAR4bG0aNSSby/tAuezRYYZaThWP14 g4xa6wndwog0tEKqV3X5dwMm+QXuuabR1eMkSu214R+VtDkxUyFxRKR4tIUDW3Lc7dJ7 hysw== MIME-Version: 1.0 X-Received: by 10.55.54.19 with SMTP id d19mr2578421qka.52.1447865548216; Wed, 18 Nov 2015 08:52:28 -0800 (PST) Sender: ewenstephan@gmail.com Received: by 10.55.147.1 with HTTP; Wed, 18 Nov 2015 08:52:28 -0800 (PST) In-Reply-To: References: Date: Wed, 18 Nov 2015 17:52:28 +0100 X-Google-Sender-Auth: oXcwZ3NfubsdMFJqyVz8mUiyxzs Message-ID: Subject: Re: Parallel file read in LocalEnvironment From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a1146fde0af1d8a0524d37711 --001a1146fde0af1d8a0524d37711 Content-Type: text/plain; charset=UTF-8 Late answer, sorry: The splits are created in the JobManager, so the sub submission should not be affected by that. The assignment of splits to workers is very fast, so many splits with small data is not very different from few splits with large data. Lines are never materialized and the operators do not work differently based on different numbers of splits. On Wed, Oct 7, 2015 at 4:26 PM, Flavio Pompermaier wrote: > I've tried to split my huge file by lines count (using the bash command > split -l) in 2 different ways: > > 1. small lines count (huge number of small files) > 2. big lines count (small number of big files) > > I can't understand why the time required to effectively start the job is > more or less the same > > - in 1. it takes a lot to fetch the file list (~50.000) and the split > assigner is fast to assign the splits (but also being fast they are a lot) > - in 2. Flink is fast in fetch the file list but it's extremely slow > to generate the splits to assign > > Initially I was thinking that Flink was eagerly materializing the lines > somewhere but both the memory and the disks doesn't increase. > What is going on underneath? Is it normal? > > Thanks in advance, > Flavio > > > > On Wed, Oct 7, 2015 at 3:27 PM, Stephan Ewen wrote: > >> The split functionality is in the FileInputFormat and the functionality >> that takes care of lines across splits is in the DelimitedIntputFormat. >> >> On Wed, Oct 7, 2015 at 3:24 PM, Fabian Hueske wrote: >> >>> I'm sorry there is no such documentation. >>> You need to look at the code :-( >>> >>> 2015-10-07 15:19 GMT+02:00 Flavio Pompermaier : >>> >>>> And what is the split policy for the FileInputFormat?it depends on the >>>> fs block size? >>>> Is there a pointer to the several flink input formats and a description >>>> of their internals? >>>> >>>> On Wed, Oct 7, 2015 at 3:09 PM, Fabian Hueske >>>> wrote: >>>> >>>>> Hi Flavio, >>>>> >>>>> it is not possible to split by line count because that would mean to >>>>> read and parse the file just for splitting. >>>>> >>>>> Parallel processing of data sources depends on the input splits >>>>> created by the InputFormat. Local files can be split just like files in >>>>> HDFS. Usually, each file corresponds to at least one split but multiple >>>>> files could also be put into a single split if necessary.The logic for that >>>>> would go into to the InputFormat.createInputSplits() method. >>>>> >>>>> Cheers, Fabian >>>>> >>>>> 2015-10-07 14:47 GMT+02:00 Flavio Pompermaier : >>>>> >>>>>> Hi to all, >>>>>> >>>>>> is there a way to split a single local file by line count (e.g. a >>>>>> split every 100 lines) in a LocalEnvironment to speed up a simple map >>>>>> function? For me it is not very clear how the local files (files into >>>>>> directory if recursive=true) are managed by Flink..is there any ref to this >>>>>> internals? >>>>>> >>>>>> Best, >>>>>> Flavio >>>>>> >>>>> >>>>> >>>> >>>> >>> >> > --001a1146fde0af1d8a0524d37711 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Late answer, sorry:

The splits are crea= ted in the JobManager, so the sub submission should not be affected by that= .

The assignment of splits to workers is very fast= , so many splits with small data is not very different from few splits with= large data.

Lines are never materialized and the = operators do not work differently based on different numbers of splits.

On Wed, O= ct 7, 2015 at 4:26 PM, Flavio Pompermaier <pompermaier@okkam.it>= wrote:
I've = tried to split my huge file by lines count (using the bash command split -l= ) in 2 different ways:
  1. small lines count (huge number of small = files)
  2. big lines count (small number of big files)
I = can't understand why the time required to effectively start the job is = more or less the same
  • in 1. it takes a lot to fetch the f= ile list (~50.000) and the split assigner is fast to assign the splits (but= also being fast they are a lot)
  • in 2. Flink is fast in fetch the f= ile list but it's extremely slow to generate the splits to assign
Initially I was thinking that Flink was eagerly materializing = the lines somewhere but both the memory and the disks doesn't increase.=
What is going on underneath? Is it normal?

Thanks in advance,
Flavio



On Wed, Oct 7, 2015 at 3:27 PM, Stephan Ewen <sewen@apach= e.org> wrote:
The split functionality is in the FileInputFormat and the functionality= that takes care of lines across splits is in the DelimitedIntputFormat.

On W= ed, Oct 7, 2015 at 3:24 PM, Fabian Hueske <fhueske@gmail.com> wrote:
I'm sorry t= here is no such documentation.
You need to look at the code :-(

2015-= 10-07 15:19 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it><= /span>:
And what is the = split policy for the FileInputFormat?it depends on the fs block size?
I= s there a pointer to the several flink input formats and a description of t= heir internals?

On Wed, Oct 7, 2015 at 3:09 PM, Fabian Hueske <fhueske@gmail= .com> wrote:
Hi Flavio,

it is not possibl= e to split by line count because that would mean to read and parse the file= just for splitting.

Parallel processing of data sources depends on = the input splits created by the InputFormat. Local files can be split just = like files in HDFS. Usually, each file corresponds to at least one split bu= t multiple files could also be put into a single split if necessary.The log= ic for that would go into to the InputFormat.createInputSplits() method.

Cheers, Fabian

2015-10-07 14:47 GMT+02:= 00 Flavio Pompermaier <pompermaier@okkam.it>:
Hi to all,

=

is there a way to split a single local file by line count (e.g. a spli= t every 100 lines) in a LocalEnvironment to speed up a simple map function?= For me it is not very clear how the local files (files into directory if r= ecursive=3Dtrue) are managed by Flink..is there any ref to this internals?<= /div>

Best,
Flavio



=




<= /p>


--001a1146fde0af1d8a0524d37711--