Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 526FF180CA for ; Wed, 24 Feb 2016 11:24:09 +0000 (UTC) Received: (qmail 69168 invoked by uid 500); 24 Feb 2016 11:24:09 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 69080 invoked by uid 500); 24 Feb 2016 11:24:08 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 69071 invoked by uid 99); 24 Feb 2016 11:24:08 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Feb 2016 11:24:08 +0000 Received: from mail-wm0-f52.google.com (mail-wm0-f52.google.com [74.125.82.52]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 2A4A91A0181 for ; Wed, 24 Feb 2016 11:24:07 +0000 (UTC) Received: by mail-wm0-f52.google.com with SMTP id g62so266424122wme.1 for ; Wed, 24 Feb 2016 03:24:07 -0800 (PST) X-Gm-Message-State: AG10YORWgSWDkBmeGdiGP1Rdjz5sXV1I9IEJv84tSAPQ0FQ2bLTNpvMZKqwRzAVlKct83nh4VBiwsaxwty2Stw== MIME-Version: 1.0 X-Received: by 10.194.174.197 with SMTP id bu5mr43090750wjc.23.1456313046529; Wed, 24 Feb 2016 03:24:06 -0800 (PST) Received: by 10.28.113.129 with HTTP; Wed, 24 Feb 2016 03:24:06 -0800 (PST) In-Reply-To: <56CC7053.9010303@math.fu-berlin.de> References: <56CC3685.2020401@math.fu-berlin.de> <56CC5547.4060500@math.fu-berlin.de> <56CC7053.9010303@math.fu-berlin.de> Date: Wed, 24 Feb 2016 12:24:06 +0100 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Best way to process data in many files? (FLINK-BATCH) From: Till Rohrmann To: user@flink.apache.org Content-Type: multipart/alternative; boundary=089e013d1a60d2013a052c824dd2 --089e013d1a60d2013a052c824dd2 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Tim, unfortunately, this is not documented explicitly as far as I know. For the InputFormats there is a marker interface called NonParallelInput. The input formats which implement this interface will be executed with a parallelism of 1. At the moment this holds true for the CollectionInputFormat, IteratorInputFormat and the JDBCInputFormat. I hope this helps. Cheers, Till =E2=80=8B On Tue, Feb 23, 2016 at 3:44 PM, Tim Conrad wrote: > Hi Till (and others). > > Thank you very much for your helpful answer. > > On 23.02.2016 14:20, Till Rohrmann wrote: > > [...] In contrast, if you had a parallel data source which would consist > of multiple source task, then these tasks would be independent and spread > out across your cluster [...] > > > Can you please send me a link to an example or to the respective Flink AP= I > doc, where I can see which is a parallel data source and how to create it > with multiple source tasks? > > A simple Google search did not provide me with an answer (maybe I used th= e > wrong key words, though...). > > > Cheers > Tim > > > > > On 23.02.2016 14:20, Till Rohrmann wrote: > > Hi Tim, > > depending on how you create the DataSource fileList, Flink will > schedule the downstream operators differently. If you used the > ExecutionEnvironment.fromCollection method, then it will create a > DataSource with a CollectionInputFormat. This kind of DataSource will > only be executed with a degree of parallelism of 1. The source will send > it=E2=80=99s collection elements in a round robin fashion to the downstre= am > operators which are executed with a higher parallelism. So when Flink > schedules the downstream operators, it will try to place them close to > their inputs. Since all flat map operators have the single data source ta= sk > as an input, they will be deployed on the same machine if possible. > > In contrast, if you had a parallel data source which would consist of > multiple source task, then these tasks would be independent and spread ou= t > across your cluster. In this case, every flat map task would have a singl= e > distinct source task as input. When the flat map tasks are deployed they > would be deployed on the machine where their corresponding source is > running. Since the source tasks are spread out across the cluster, the fl= at > map tasks would be spread out as well. > > What you could do to mitigate your problem is to start the cluster with a= s > many slots as your maximum degree of parallelism is. That way, you=E2=80= =99ll > utilize all cluster resources. > > I hope this clarifies a bit why you observe that tasks tend to cluster on > a single machine. > > Cheers, > Till > =E2=80=8B > > > --089e013d1a60d2013a052c824dd2 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

Hi Tim,

unfortunately, this is not docu= mented explicitly as far as I know. For the InputFormats there is a marker interface called NonParallelInput<= /code>. The input formats which implement this interface will be executed w= ith a parallelism of 1. At the moment this holds true for the CollectionInputFormat, IteratorInputFormat and the JDBCInputFormat.

I hope this helps.

Cheers,
Till

=E2=80=8B

On Tue, F= eb 23, 2016 at 3:44 PM, Tim Conrad <conrad@math.fu-berlin.de>= ; wrote:
=20 =20 =20
Hi Till (and others).

Thank you very much for your helpful answer.

On 23.02.2016 14:20, Till Rohrmann wrote:
[...] In contrast, if you had a parallel data= source which would consist of multiple source task, then these tasks would be independent and spread out across your cluster [...]
Can you please send me a link to an example or to the respective Flink API doc, where I can see which is a parallel data source and how to create it with multiple source tasks?

A simple Google search did not provide me with an answer (maybe I used the wrong key words, though...).


Cheers
Tim





On 23.02.2016 14:20, Till Rohrmann wrote:

Hi Tim,

depending on how you create the DataSource<String> fileList, Flink will schedule the downstream operators differently. If you used the ExecutionEnvironment.fromCollecti= on method, then it will create a DataSource with a CollectionInputFormat. This kind of DataSource will only be executed with a degree of parallelism of 1. The source will send it=E2=80=99s collection elements in a round ro= bin fashion to the downstream operators which are executed with a higher parallelism. So when Flink schedules the downstream operators, it will try to place them close to their inputs. Since all flat map operators have the single data source task as an input, they will be deployed on the same machine if possible.

In contrast, if you had a parallel data source which would consist of multiple source task, then these tasks would be independent and spread out across your cluster. In this case, every flat map task would have a single distinct source task as input. When the flat map tasks are deployed they would be deployed on the machine where their corresponding source is running. Since the source tasks are spread out across the cluster, the flat map tasks would be spread out as well.

What you could do to mitigate your problem is to start the cluster with as many slots as your maximum degree of parallelism is. That way, you=E2=80=99ll utilize all cluster resources.

I hope this clarifies a bit why you observe that tasks tend to cluster on a single machine.

Cheers,
Till

=E2= =80=8B



--089e013d1a60d2013a052c824dd2--