From user-return-22033-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Aug 10 12:06:24 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 2E1BD180630 for ; Fri, 10 Aug 2018 12:06:23 +0200 (CEST) Received: (qmail 68161 invoked by uid 500); 10 Aug 2018 10:06:17 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 68151 invoked by uid 99); 10 Aug 2018 10:06:17 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Aug 2018 10:06:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id B997EC157F for ; Fri, 10 Aug 2018 10:06:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.182 X-Spam-Level: *** X-Spam-Status: No, score=3.182 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, T_DKIMWL_WL_MED=-0.01, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id RuyQJZFyIxEX for ; Fri, 10 Aug 2018 10:06:13 +0000 (UTC) Received: from mail-qk0-f195.google.com (mail-qk0-f195.google.com [209.85.220.195]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id F41055F16A for ; Fri, 10 Aug 2018 10:06:12 +0000 (UTC) Received: by mail-qk0-f195.google.com with SMTP id u21-v6so5996961qku.2 for ; Fri, 10 Aug 2018 03:06:12 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=wAF45HUwg/C2QvEwuqUgE+vOF2cbWqMwSRekrs7bVEg=; b=lbANfC7r0ZFWD1Q7d4/nbboFf7dznp1UhFBGHC0FP6iLfAS+5dfFWkVtgyg4Aw69yb FDRurx0Os/HyYwx+f6pA8BzurG7ZA/p7+YAtc6XlFweQysxl1l9uB2BplA5d8foHeEo/ MquiGoNYSseW8kdkgEHb0Zk3KrCE9raeLtBgE7J36IDYpv2QxV+pN42Adv2WtGX3cEVR 24RV2tF5APfkQWvzQXsGxdYpfEL51L1csY8xDv+tAcAhPzP+a3/KSR0IFN4g9A8Kg7Ih cG0Uq8ORz/kNiaf8I6IQxKC5IOucjO3Crj0/mkk1X4tEueooJSIUhCinylIeWpCg5OdQ WYmQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=wAF45HUwg/C2QvEwuqUgE+vOF2cbWqMwSRekrs7bVEg=; b=NzD1sxoswJt9Glln2YAYwbq/jS5ev4EsW5IbSmREqTAF8kEQOl9uqW55oJ4BP288K+ 1jx7r1yG+3tq3el/7x1Neh3ykxWAUsOPGTwKSQjM9whWUn1iGeABtBiq+exJOfwQ6FT2 6OTtZMdI0B4djljBOOJihNYXkij8f4Y0bHb+2GITEW7V67eKC3JHmubRC/UvYWmCTGh/ K9/UjGi+gUPoCDeXLQWz6+VH8jcx4al6Uf+KhoVegrl8fU9Kcm5gmLcjJosNZ2ZwMIQ5 XDpT+uqikQk0vEDFGEWdmBdbgwXV3YpG1o5mx2M+Uzf8Sj+EQHwEeTIryoHcbvHBLHna SxUQ== X-Gm-Message-State: AOUpUlFFrfvk+GUAAB8fdcuTFbifO/w7l45P925w5B/MRhuIU0VfqnLG +SBc50yCZmoTj4b3P0N+Z/hVyrw/gCBIAGNjea8= X-Google-Smtp-Source: AA+uWPwBl+MxYJE+LZNPts+0Hw32wljAp2dSOm5WBjz/MjO1FZ4gJJ10HPPMiowkk3tF0vyPmE86q78tUB9ko0kSrzg= X-Received: by 2002:a37:b307:: with SMTP id c7-v6mr5120862qkf.341.1533895571690; Fri, 10 Aug 2018 03:06:11 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Alexis Sarda Date: Fri, 10 Aug 2018 12:04:51 +0200 Message-ID: Subject: Re: JDBCInputFormat and SplitDataProperties To: fhueske@gmail.com Cc: user@flink.apache.org Content-Type: multipart/alternative; boundary="000000000000ac5cbf057311e418" --000000000000ac5cbf057311e418 Content-Type: text/plain; charset="UTF-8" It seems I may have spoken too soon. After executing the job with more data, I can see the following things in the Flink dashboard: - The first subtask is a chained DataSource -> GroupCombine. Even with parallelism set to 24 and a ParameterValuesProvider returning Array(Array("first"), Array("second")), only 1 thread processed all records. - The second subtask is a Sorted Group Reduce, and I see two weird things: + The first subtask sent 5,923,802 records, yet the second subtask only received 5,575,154 records? + Again, everything was done in a single thread, even though a groupBy was used. - The third and final subtask is a sink that saves back to the database. Does anyone know why parallelism is not being used? Regards, Alexis. On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda wrote: > Hi Fabian, > > Thanks a lot for the help. The scala DataSet, at least in version 1.5.0, > declares javaSet as private[flink], so I cannot access it directly. > Nevertheless, I managed to get around it by using the java environment: > > val env = org.apache.flink.api.java.ExecutionEnvironment. > getExecutionEnvironment > > val inputFormat = getInputFormat(query, dbUrl, properties) > val outputFormat = getOutputFormat(dbUrl, properties) > > val source = env.createInput(inputFormat) > val sdp = source.getSplitDataProperties > sdp.splitsPartitionedBy(0) > sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING)) > > // transform java DataSet to scala DataSet... > new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]]) > .groupBy(0, 1) > .combineGroup(groupCombiner) > .withForwardedFields("f0->_1") > .groupBy(0, 1) > .reduceGroup(groupReducer) > .withForwardedFields("_1") > .output(outputFormat) > > It seems to work well, and the semantic annotation does remove a hash > partition from the execution plan. > > Regards, > Alexis. > > > On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske wrote: > >> Hi Alexis, >> >> The Scala API does not expose a DataSource object but only a Scala >> DataSet which wraps the Java object. >> You can get the SplitDataProperties from the Scala DataSet as follows: >> >> val dbData: DataSet[...] = ??? >> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties >> >> So you first have to get the wrapped Java DataSet, cast it to DataSource >> and then get the properties. >> It's not very nice, but should work. >> >> In order to use SDPs, you should be a bit familiar how physical data >> properties are propagated and discarded in the optimizer. >> For example, applying a simple MapFunction removes all properties because >> the function might have changed the fields on which a DataSet is >> partitioned or sorted. >> You can expose the behavior of a function to the optimizer by using >> Semantic Annotations [1] >> >> Some comments on the code and plan you shared: >> - You might want to add hostname to ORDER BY to have the output grouped >> by (ts, hostname). >> - Check the Global and Local data properties in the plan to validate that >> the SDP were correctly interpreted. >> - If the data is already correctly partitioned and sorted, you might not >> need the Combiners. In either case, you properly want to annotate them with >> Forward Field annoations. >> >> The number of source tasks is unrelated to the number of splits. If you >> have more tasks than splits, some tasks won't process any data. >> >> Best, Fabian >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations >> >> >> 2018-08-08 14:10 GMT+02:00 Alexis Sarda : >> >>> Hi Fabian, >>> >>> Thanks for the clarification. I have a few remarks, but let me provide >>> more concrete information. You can find the query I'm using, the >>> JDBCInputFormat creation, and the execution plan in this github gist: >>> >>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d >>> >>> I cannot call getSplitDataProperties because >>> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the >>> code, I do this instead: >>> >>> val javaEnv = >>> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment >>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo, >>> "example") >>> >>> which feels wrong (the constructor doesn't accept a Scala environment). >>> Is there a better alternative? >>> >>> I see absolutely no difference in the execution plan whether I use SDP >>> or not, so therefore the results are indeed the same. Is this expected? >>> >>> My ParameterValuesProvider specifies 2 splits, yet the execution plan >>> shows Parallelism=24. Even the source code is a bit ambiguous, considering >>> that the constructor for GenericInputSplit takes two parameters: >>> partitionNumber and totalNumberOfPartitions. Should I assume that there are >>> 2 splits divided into 24 partitions? >>> >>> Regards, >>> Alexis. >>> >>> >>> >>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske wrote: >>> >>>> Hi Alexis, >>>> >>>> First of all, I think you leverage the partitioning and sorting >>>> properties of the data returned by the database using SplitDataProperties. >>>> However, please be aware that SplitDataProperties are a rather >>>> experimental feature. >>>> >>>> If used without query parameters, the JDBCInputFormat generates a >>>> single split and queries the database just once. If you want to leverage >>>> parallelism, you have to specify a query with parameters in the WHERE >>>> clause to read different parts of the table. >>>> Note, depending on the configuration of the database, multiple queries >>>> result in multiple full scans. Hence, it might make sense to have an index >>>> on the partitioning columns. >>>> >>>> If properly configured, the JDBCInputFormat generates multiple splits >>>> which are partitioned. Since the partitioning is encoded in the query, it >>>> is opaque to Flink and must be explicitly declared. >>>> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells >>>> Flink that all records with the same value in the partitioning field are >>>> read from the same split, i.e, the full data is partitioned on the >>>> attribute across splits. >>>> The same can be done for ordering if the queries of the JDBCInputFormat >>>> is specified with an ORDER BY clause. >>>> Partitioning and grouping are two different things. You can define a >>>> query that partitions on hostname and orders by hostname and timestamp and >>>> declare these properties in the SDP. >>>> >>>> You can get a SDP object by calling >>>> DataSource.getSplitDataProperties(). In your example this would be >>>> source.getSplitDataProperties(). >>>> >>>> Whatever you do, you should carefully check the execution plan >>>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and >>>> validate that the result are identical whether you use SDP or not. >>>> >>>> Best, Fabian >>>> >>>> [1] https://flink.apache.org/visualizer/ >>>> >>>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda : >>>> >>>>> Hi everyone, >>>>> >>>>> I have the following scenario: I have a database table with 3 columns: >>>>> a host (string), a timestamp, and an integer ID. Conceptually, what I'd >>>>> like to do is: >>>>> >>>>> group by host and timestamp -> based on all the IDs in each group, >>>>> create a mapping to n new tuples -> for each unique tuple, count how many >>>>> times it appeared across the resulting data >>>>> >>>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1 >>>>> >>>>> What I'm currently doing is roughly: >>>>> >>>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish() >>>>> val source = environment.createInput(inut) >>>>> source.partitionByHash("host", >>>>> "timestamp").mapPartition(...).groupBy(0, 1).aggregate(SUM, 2) >>>>> >>>>> The query given to JDBCInputFormat provides results ordered by host >>>>> and timestamp, and I was wondering if performance can be improved by >>>>> specifying this in the code. I've looked at >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html >>>>> and >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html, >>>>> but I still have some questions: >>>>> >>>>> - If a split is a subset of a partition, what is the meaning of >>>>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a >>>>> split is divided into partitions, meaning that a partition would be a >>>>> subset of a split. >>>>> - At which point can I retrieve and adjust a SplitDataProperties >>>>> instance, if possible at all? >>>>> - If I wanted a coarser parallelization where each slot gets all the >>>>> data for the same host, would I have to manually create the sub-groups >>>>> based on timestamp? >>>>> >>>>> Regards, >>>>> Alexis. >>>>> >>>>> >>>> >> --000000000000ac5cbf057311e418 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
It seems I may have spoken too soon. After executing the j= ob with more data, I can see the following things in the Flink dashboard:
- The first subtask is a chained DataSource -> GroupCo= mbine. Even with parallelism set to 24 and a ParameterValuesProvider return= ing Array(Array("first"), Array("second")), only 1 thre= ad processed all records.
- The second subtask is a Sorted Group = Reduce, and I see two weird things:
=C2=A0=C2=A0+ The first subta= sk sent 5,923,802 records, yet the second subtask only received 5,575,154 r= ecords?
=C2=A0=C2=A0+ Again, everything was done in a single thre= ad, even though a groupBy was used.
- The third and final subtask= is a sink that saves back to the database.

Do= es anyone know why parallelism is not being used?

= Regards,
Alexis.


On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda <= alexis.sarda@gmail.com> wr= ote:
Hi Fabian,
Thanks a lot for the help. The scala DataSet, at least in = version 1.5.0, declares javaSet as private[flink], so I cannot access it di= rectly. Nevertheless, I managed to get around it by using the java environm= ent:

val env =3D org.apache.= flink.api.java.ExecutionEnvironment.getExecutionEnvironment
val inputFormat =3D getInputFormat(query, dbUrl, properties)
val outputFormat =3D= getOutputFormat(dbUrl, properties= )

val sou= rce =3D env.createInput(inputFormat)
val sdp =3D source.getSplitDataProperties
sdp.sp= litsPartitionedBy(0)
sdp.split= sOrderedBy(Array(1), Array= (Order.ASCENDING))

// t= ransform java DataSet to scala DataSet...
new DataSet(source.asInstanceOf[org.apa= che.flink.api.java.DataSet[Row]])
.groupBy(0, 1)
.combin= eGroup(groupCombine= r)
.withForwardedFields("f0->_1")
.groupBy(0, 1)
= .reduceGroup(grou= pReducer)
.withForwardedFields("_1")
.output(outputFormat)
=
It seems to work well, and the semantic annotation does re= move a hash partition from the execution plan.

Reg= ards,
Alexis.


On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fhueske@gmail.com&= gt; wrote:
Hi= Alexis,

The Scala API does not expose a DataSourc= e object but only a Scala DataSet which wraps the Java object.
You can get the SplitDataProperties from the Scala DataSet as follows:

val dbData: DataSet[...] =3D ???
val = sdp =3D dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties

So you first have to get the wrapped Java DataSet, cas= t it to DataSource and then get the properties.
It's not very= nice, but should work.

In order to use SDPs, you = should be a bit familiar how physical data properties are propagated and di= scarded in the optimizer.
For example, applying a simple MapFunct= ion removes all properties because the function might have changed the fiel= ds on which a DataSet is partitioned or sorted.
You can expose th= e behavior of a function to the optimizer by using Semantic Annotations [1]=

Some comments on the code and plan you shared= :
- You might want to add hostname to ORDER BY to = have the output grouped by (ts, hostname).
- Check the Global and= Local data properties in the plan to validate that the SDP were correctly = interpreted.
- If the data is already correctly partitioned and s= orted, you might not need the Combiners. In either case, you properly want = to annotate them with Forward Field annoations.

The number of source tasks is unrelated to the number of splits. If you h= ave more tasks than splits, some tasks won't process any data.

Best, Fabian

=

2018-08= -08 14:10 GMT+02:00 Alexis Sarda <alexis.sarda@gmail.com>:
Hi Fabian,

<= /div>
Thanks for the clarification. I have a few remarks, but let me pr= ovide more concrete information. You can find the query I'm using, the = JDBCInputFormat creation, and the execution plan in this github gist:
=


I cannot c= all getSplitDataProperties because env.createInput(inputFormat) returns a D= ataSet, not a DataSource. In the code, I do this instead:

val = javaEnv =3D org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvi= ronment
val dataSource =3D new DataSource(javaEnv, inputFormat, rowTypeI= nfo, "example")

which feels wrong (the constru= ctor doesn't accept a Scala environment). Is there a better alternative= ?

I see absolutely no difference in the execution = plan whether I use SDP or not, so therefore the results are indeed the same= . Is this expected?

My ParameterValuesProvider spe= cifies 2 splits, yet the execution plan shows Parallelism=3D24. Even the so= urce code is a bit ambiguous, considering that the constructor for GenericI= nputSplit takes two parameters: partitionNumber and totalNumberOfPartitions= . Should I assume that there are 2 splits divided into 24 partitions?
=

Regards,
Alexis.



On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fhueske@gmail.com> = wrote:
Hi Ale= xis,

First of all, I think you leverage the partit= ioning and sorting properties of the data returned by the database using Sp= litDataProperties.
However, please be aware that SplitDataPropert= ies are a rather experimental feature.

If used wit= hout query parameters, the JDBCInputFormat generates a single split and que= ries the database just once. If you want to leverage parallelism, you have = to specify a query with parameters in the WHERE clause to read different pa= rts of the table.
Note, depending on the configuration of the dat= abase, multiple queries result in multiple full scans. Hence, it might make= sense to have an index on the partitioning columns.

If properly configured, the JDBCInputFormat generates multiple splits wh= ich are partitioned. Since the partitioning is encoded in the query, it is = opaque to Flink and must be explicitly declared.
This can be done= with SDPs. The SDP.splitsPartitionedBy() method tells Flink that all recor= ds with the same value in the partitioning field are read from the same spl= it, i.e, the full data is partitioned on the attribute across splits.
=
The same can be done for ordering if the queries of the JDBCInputForma= t is specified with an ORDER BY clause.
Partitioning and gro= uping are two different things. You can define a query that partitions on h= ostname and orders by hostname and timestamp and declare these properties i= n the SDP.

You can get a SDP object by calling= DataSource.getSplitDataProperties(). In your example this would be source.= getSplitDataProperties().

Whatever you do, you sho= uld carefully check the execution plan (ExecutionEnvironment.getExecutionPl= an()) using the plan visualizer [1] and validate that the result are identi= cal whether you use SDP or not.

Best, Fabian


2018-08-07 22:32= GMT+02:00 Alexis Sarda <alexis.sarda@gmail.com>:
Hi everyone,

I have the following scenario: I have a database table with 3 columns: a= host (string), a timestamp, and an integer ID. Conceptually, what I'd = like to do is:

group by host and timestamp -> b= ased on all the IDs in each group, create a mapping to n new tuples -> f= or each unique tuple, count how many times it appeared across the resulting= data

Each new tuple has 3 fields: the host, a new= ID, and an Integer=3D1

What I'm currently doi= ng is roughly:

val input =3D JDBCInputFormat.buildJDBCI= nputFormat()...finish()
val source =3D environment.createInput(inut)
source.partitionByHash("host", "timestamp").map= Partition(...).groupBy(0, 1).aggregate(SUM, 2)

The= query given to JDBCInputFormat provides results ordered by host and timest= amp, and I was wondering if performance can be improved by specifying this = in the code. I've looked at=C2=A0http://apache-flink-user-mailing-list-a= rchive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.= html and=C2=A0http://apache-flink-user-mailing-list-archive.2336050.n4.na= bble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html, but I still= have some questions:

- If a split is a subset of = a partition, what is the meaning of SplitDataProperties#splitsPartitionedBy= ? The wording makes me thing that a split is divided into partitions, meani= ng that a partition would be a subset of a split.
- At which poin= t can I retrieve and adjust a SplitDataProperties instance, if possible at = all?
- If I wanted a coarser parallelization where each slot gets= all the data for the same host, would I have to manually create the sub-gr= oups based on timestamp?

Regards,
Alexis= .



--000000000000ac5cbf057311e418--