Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 74ABF200BF8 for ; Fri, 13 Jan 2017 15:14:18 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 73381160B3F; Fri, 13 Jan 2017 14:14:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4480A160B32 for ; Fri, 13 Jan 2017 15:14:17 +0100 (CET) Received: (qmail 41310 invoked by uid 500); 13 Jan 2017 14:14:16 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 41300 invoked by uid 99); 13 Jan 2017 14:14:16 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Jan 2017 14:14:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D86DA1A0229 for ; Fri, 13 Jan 2017 14:14:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id mRVdx7rsc_5t for ; Fri, 13 Jan 2017 14:14:13 +0000 (UTC) Received: from mail-lf0-f51.google.com (mail-lf0-f51.google.com [209.85.215.51]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 6224C5F2EF for ; Fri, 13 Jan 2017 14:14:12 +0000 (UTC) Received: by mail-lf0-f51.google.com with SMTP id k86so38509060lfi.0 for ; Fri, 13 Jan 2017 06:14:12 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=rykV+2TlOaUKu6DedCeQjzUhXlbDtuePLAl4kHWiDsU=; b=b7cvRBHh30znuUXzgo0lqciW4ucIO8PBAyw1Lg1Jcj/BoRY2Vpwew22YANDxZBapqP XZ8E+BRuaKgiXpLkByDmNlcQ/4k6u3oeKI2n3SQtBWG+4lzr6IjlasbTVT5759syW2NM bSRfJ9WG4Lk8V5rqagCsVT96NuqVoxsbpdj4q+6PO4wXQeaZq/HICR094S+dTXZrIN2G 0HwT0SbtjKM4rEvybvXiwfet9Z0E4pGH0uIEGEEiQQwJ/IqJ6cm4MH8XEVKdCTM6qHa+ r9MMwTfM2nbGK9Cb0yieNdtPRsDe4wGyFvilp4qw+nspjgPra084aluwPXy55WHhVHCV +SFw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=rykV+2TlOaUKu6DedCeQjzUhXlbDtuePLAl4kHWiDsU=; b=j9vppt2Mn7Xsjkc+BLGVognMCyxs+M0Kk/pWcbX7Ecf2J29vHHmgP4h0whKqEPBBUm JndPOtvOfcLZ4UDfq5F2i/I57oPz6GIf0EtDv9Nubr2ZCF3Xotl9K7kglvuSKiXixJtp 0aLYUcMhtfltHtDASUKVcHOFVoYTedYnhdWSu1RruQhbFv4ymVvTt960Kex8ES/uRmOZ CRa1M4GG2CTLojrX1DsIkUy75t7oNBT5Hqq35rb/PoKa9HaqzscaUepOSWvP7jIW6LQE mUPO1oYB1CNdPzU4quBg4Q9bKFTXsDzbJdU9XHBg3GiCD4kKGejhX035trsSnqUIFvUg 5GOQ== X-Gm-Message-State: AIkVDXJ/Lq3frvNiMqQivRMN6Gv/H2ravDUx39FTJNKrCBbt2vOzmeR3C8INBwulADzJo74hqTz6Pm+JZ42Erw== X-Received: by 10.25.74.72 with SMTP id x69mr5058638lfa.66.1484316845558; Fri, 13 Jan 2017 06:14:05 -0800 (PST) MIME-Version: 1.0 Received: by 10.25.221.200 with HTTP; Fri, 13 Jan 2017 06:14:04 -0800 (PST) In-Reply-To: References: From: Robert Schmidtke Date: Fri, 13 Jan 2017 15:14:04 +0100 Message-ID: Subject: Re: Terminology: Split, Group and Partition To: user@flink.apache.org Content-Type: multipart/alternative; boundary=94eb2c1a160e5052760545fa7236 archived-at: Fri, 13 Jan 2017 14:14:18 -0000 --94eb2c1a160e5052760545fa7236 Content-Type: text/plain; charset=UTF-8 Hi Fabian, thanks for the quick and comprehensive reply. I'll have a look at the ExecutionPlan using your suggestion to check what actually gets computed, and I'll use the properties as well. If I stumble across something else I'll let you know. Many thanks again! Robert On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske wrote: > Hi Robert, > > let me first describe what splits, groups, and partitions are. > > * Partition: This is basically all data that goes through the same task > instance. If you have an operator with a parallelism of 80, you have 80 > partitions. When you call sortPartition() you'll have 80 sorted streams, if > you call mapPartition you iterate over all records in one partition. > * Split: Splits are a concept of InputFormats. An InputFormat can process > several splits. All splits that are processed by the same data source task > make up the partition of that task. So a split is a subset of a partition. > In your case where each task reads exactly one split, the split is > equivalent to the partition. > * Group: A group is based on the groupBy attribute and hence data-driven > and does not depend on the parallelism. A groupReduce requires a > partitioning such that all records with the same grouping attribute are > sent to the same operator, i.e., all are part of the same partition. > Depending on the number of distinct grouping keys (and the hash-function) a > partition can have zero, one, or more groups. > > Now coming to your use case. You have 80 sources running on 5 machines. > All source on the same machine produce records with the same grouping key > (hostname). You can actually give a hint to Flink, that the data returned > by a split is partitioned, grouped, or sorted in a specific way. This works > as follows: > > // String is hostname, Integer is parallel id of the source task > DataSet> = env.createInput(YourFormat); > SplitDataProperties> splitProps = > ((DataSource)text).getSplitDataProperties(); > splitProps.splitsGroupedBy(0,1) > splitProps.splitsPartitionedBy(0,1) > > With this info, Flink knows that the data returned by our source is > partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to > run a local groupReduce operation on each of the 80 tasks (hostname and > parallel index result in 80 keys) and locally reduce the data. > Next step would be another .groupBy(0).groupReduce() which gives 16 groups > which are distributed across your tasks. > > However, you have to be careful with the SplitDataProperties. If you get > them wrong, the optimizer makes false assumption and the resulting plan > might not compute what you are looking for. > I'd recommend to read the JavaDocs and play a bit with this feature to see > how it behaves. ExecutionEnvironment.getExecutionPlan() can help to > figure out what is happening. > > Best, > Fabian > > > 2017-01-13 12:14 GMT+01:00 Robert Schmidtke : > >> Hi all, >> >> I'm having some trouble grasping what the meaning of/difference between >> the following concepts is: >> >> - Split >> - Group >> - Partition >> >> Let me elaborate a bit on the problem I'm trying to solve here. In my >> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in >> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the >> JobManager on one node, and a TaskManager on each node. I'm assigning 16 >> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16 >> Slots). >> >> The data I want to process resides in a local folder on each worker with >> the same path (say /tmp/input). There can be arbitrarily many input files >> in each worker's folder. I have written a custom input format that >> round-robin assigns the files to each of the 16 local input splits ( >> https://github.com/robert-schmidtke/hdfs-statistics-adapter >> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/ >> io/SfsInputFormat.java) to obtain a total of 80 input splits that need >> processing. Each split reads zero or more files, parsing the contents into >> records that are emitted correctly. This works as expected. >> >> Now we're getting to the questions. How do these 80 input splits relate >> to groups and partitions? My understanding of a partition is a subset of my >> DataSet that is local to each node. I.e. if I were to repartition the >> data according to some scheme, a shuffling over workers would occur. After >> reading all the data, I have 80 partitions, correct? >> >> What is less clear to me is the concept of a group, i.e. the result of a >> groupBy operation. The input files I have are produced on each worker by >> some other process. I first want to do pre-aggregation (I hope that's the >> term) on each node before sending data over the network. The records I'm >> processing contain a 'hostname' attribute, which is set to the worker's >> hostname that processes the data, because the DataSources are local. That >> means the records produced by the worker on host1 always contain the >> attribute hostname=host1. Similar for the other 4 workers. >> >> Now what happens if I do a groupBy("hostname")? How do the workers >> realize that no network transfer is necessary? Is a group a logical >> abstraction, or a physical one (in my understanding a partition is physical >> because it's local to exactly one worker). >> >> What I'd like to do next is a reduceGroup to merge multiple records into >> one (some custom, yet straightforward, aggregation) and emit another record >> for every couple of input records. Am I correct in assuming that the >> Iterable values passed to the reduce function all have the same hostname >> value? That is, will the operation have a parallelism of 80, where 5x16 >> operations will have the same hostname value? Because I have 16 splits per >> host, the 16 reduces on host1 should all receive values with >> hostname=host1, correct? And after the operation has finished, will the >> reduced groups (now actual DataSets again) still be local to the workers? >> >> This is quite a lot to work on I have to admit. I'm happy for any hints, >> advice and feedback on this. If there's need for clarification I'd be happy >> to provide more information. >> >> Thanks a lot in advance! >> >> Robert >> >> -- >> My GPG Key ID: 336E2680 >> > > -- My GPG Key ID: 336E2680 --94eb2c1a160e5052760545fa7236 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Fabian,

thanks for the quick and com= prehensive reply. I'll have a look at the ExecutionPlan using your sugg= estion to check what actually gets computed, and I'll use the propertie= s as well. If I stumble across something else I'll let you know.
<= div>
Many thanks again!
Robert

On Fri, Jan 13, 2017 at 2:= 40 PM, Fabian Hueske <fhueske@gmail.com> wrote:
Hi Rob= ert,

let me first describe what splits, groups, and partitions= are.

* Partition: This is basically all data that goes throug= h the same task instance. If you have an operator with a parallelism of 80,= you have 80 partitions. When you call sortPartition() you'll have 80 s= orted streams, if you call mapPartition you iterate over all records in one= partition.
* Split: Splits are a concept of InputFormats. An Inpu= tFormat can process several splits. All splits that are processed by the sa= me data source task make up the partition of that task. So a split is a sub= set of a partition. In your case where each task reads exactly one split, t= he split is equivalent to the partition.
* Group: A group is based= on the groupBy attribute and hence data-driven and does not depend on the = parallelism. A groupReduce requires a partitioning such that all records wi= th the same grouping attribute are sent to the same operator, i.e., all are= part of the same partition. Depending on the number of distinct grouping k= eys (and the hash-function) a partition can have zero, one, or more groups.=

Now coming to your use case. You have 80 sources running on 5= machines. All source on the same machine produce records with the same gro= uping key (hostname). You can actually give a hint to Flink, that the data = returned by a split is partitioned, grouped, or sorted in a specific way. T= his works as follows:

// String is hostname, Integer is parall= el id of the source task
DataSet<Tuple3<String, Integer, Long= >> =3D env.createInput(YourFormat);
SplitDataProperties<Tuple3&= lt;String, Integer, Long>> splitProps =3D ((DataSource)text).getSplitDataProperties();
splitProps.splitsGroupedBy(0,1)
spli= tProps.splitsPartitionedBy(0,1)

With this info,= Flink knows that the data returned by our source is partitioned and groupe= d. Now you can do groupBy(0,1).groupReduce(XXX) to run a local groupReduce = operation on each of the 80 tasks (hostname and parallel index result in 80= keys) and locally reduce the data.
Next step would be anothe= r .groupBy(0).groupReduce() which gives 16 groups which are distributed acr= oss your tasks.

However, you have to be careful with the = SplitDataProperties. If you get them wrong, the optimizer makes false assum= ption and the resulting plan might not compute what you are looking for.
I'd recommend to read the JavaDocs and play a bit with this= feature to see how it behaves. ExecutionEnvironment.getExecutionPlan(= ) can help to figure out what is happening.

Be= st,
Fabian


2017-01-13 12:14 GMT+01:00 Robert Schmidtke <= ;ro.schmidtke@g= mail.com>:
Hi all,

I'm having some trouble grasping what the m= eaning of/difference between the following concepts is:

- Split
- Group
- Partition

Let me elaborate a bit on the problem I'm trying to sol= ve here. In my tests I'm using a 5-node cluster, on which I'm runni= ng Flink 1.1.3 in standalone mode. Each node has 64G of memory and 32 cores= . I'm starting the JobManager on one node, and a TaskManager on each no= de. I'm assigning 16 slots to each TaskManager, so the overall parallel= ism is 80 (=3D 5 TMs x 16 Slots).

The data I want = to process resides in a local folder on each worker with the same path (say= /tmp/input). There can be arbitrarily many input files in each worker'= s folder. I have written a custom input format that round-robin assigns the= files to each of the 16 local input splits (https://= github.com/robert-schmidtke/hdfs-statistics-adapter/blob/master/s= fs-analysis/src/main/java/de/zib/sfs/analysis/io/SfsInputFormat.j= ava) to obtain a total of 80 input splits that need processing. Each sp= lit reads zero or more files, parsing the contents into records that are em= itted correctly. This works as expected.

Now we= 9;re getting to the questions. How do these 80 input splits relate to group= s and partitions? My understanding of a partition is a subset of my DataSet= <X> that is local to each node. I.e. if I were to repartition the dat= a according to some scheme, a shuffling over workers would occur. After rea= ding all the data, I have 80 partitions, correct?

= What is less clear to me is the concept of a group, i.e. the result of a gr= oupBy operation. The input files I have are produced on each worker by some= other process. I first want to do pre-aggregation (I hope that's the t= erm) on each node before sending data over the network. The records I'm= processing contain a 'hostname' attribute, which is set to the wor= ker's hostname that processes the data, because the DataSources are loc= al. That means the records produced by the worker on host1 always contain t= he attribute hostname=3Dhost1. Similar for the other 4 workers.
<= br>
Now what happens if I do a groupBy("hostname")? How= do the workers realize that no network transfer is necessary? Is a group a= logical abstraction, or a physical one (in my understanding a partition is= physical because it's local to exactly one worker).

What I'd like to do next is a reduceGroup to merge multiple reco= rds into one (some custom, yet straightforward, aggregation) and emit anoth= er record for every couple of input records. Am I correct in assuming that = the Iterable<X> values passed to the reduce function all have the sam= e hostname value? That is, will the operation have a parallelism of 80, whe= re 5x16 operations will have the same hostname value? Because I have 16 spl= its per host, the 16 reduces on host1 should all receive values with hostna= me=3Dhost1, correct? And after the operation has finished, will the reduced= groups (now actual DataSets again) still be local to the workers?

This is quite a lot to work on I have to admit. I'm ha= ppy for any hints, advice and feedback on this. If there's need for cla= rification I'd be happy to provide more information.

Thanks a lot in advance!

Robert

--
My GPG Key ID: 336E2680




--
=
My GPG Key ID: 336E2680
--94eb2c1a160e5052760545fa7236--