Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5CB49199EF for ; Wed, 30 Mar 2016 16:15:17 +0000 (UTC) Received: (qmail 15966 invoked by uid 500); 30 Mar 2016 16:15:17 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 15872 invoked by uid 500); 30 Mar 2016 16:15:17 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 15863 invoked by uid 99); 30 Mar 2016 16:15:17 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Mar 2016 16:15:17 +0000 Received: from [192.168.0.2] (unknown [114.199.154.181]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 5BB381A0052 for ; Wed, 30 Mar 2016 16:15:16 +0000 (UTC) Content-Type: text/plain; charset=utf-8 Mime-Version: 1.0 (Mac OS X Mail 9.3 \(3124\)) Subject: Re: Implicit inference of TypeInformation for join keys From: Chiwan Park In-Reply-To: Date: Thu, 31 Mar 2016 01:15:13 +0900 Content-Transfer-Encoding: quoted-printable Message-Id: References: To: user@flink.apache.org X-Mailer: Apple Mail (2.3124) Hi Timur, You have to use `createTypeInfomation` method in `org.apache.flink.api` = package to create TypeInformation object for Scala-specific objects such = as case classes, tuples, eithers, options. For example: ``` import org.apache.flink.api.scala._ // to import package object val a: DataSet[Thing] =3D =E2=80=A6 val b: DataSet[Thing] =3D =E2=80=A6 a.coGroup(b) .where(e =3D> (e.f1, e.f2)) .equalTo(e =3D> (e.f1, e.f2))(createTypeInformation[(String, String)]) = { (left, right) =3D> 1 }.print() ``` Note that Flink creates internally copied 2-tuples consisted of = (extracted key by KeySelector, original value). So there is some = performance decrease when you are using KeySelector. Regards, Chiwan Park > On Mar 31, 2016, at 12:58 AM, Timur Fayruzov = wrote: >=20 > Thank you Chiwan! Yes, I understand that there are workarounds that = don't use function argument (and thus do not require implicit = arguments). I try to avoid positional and string-based keys because = there is no compiler guarantees when you refactor or accidentally change = the underlying case classes. Providing a function is the cleanest = solution (and arguably is the most readable) so it'd be great to make it = work. >=20 > BTW, TypeInformation.of has an implementation that takes TypeHint = (https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/= apache/flink/api/common/typeinfo/TypeHint.java) which, according to = documentation, is supposed to be used for generic classes, but using it = still leads to the same exception. >=20 > Thanks, > Timur >=20 >=20 > On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park = wrote: > Hi Timur, >=20 > You can use a composite key [1] to compare keys consisting of multiple = fields. For example: >=20 > ``` > val a =3D env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > val b =3D env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > a.coGroup(b) > .where(=E2=80=9Cf1=E2=80=9D, =E2=80=9Cf2=E2=80=9D) // Flink compares = the values of f1 first, and compares the values of f2 if values of f1 = are same. > .equalTo(=E2=80=9Cf1=E2=80=9D, =E2=80=9Cf2=E2=80=9D) { // Note that = you must specify same number of keys > (left, right) =3D> 1 > } > ``` >=20 > Composite key can be applied to Scala tuple also: >=20 > ``` > val a =3D env.fromCollection(Seq(("a", "b"), ("c", "d"))) > val b =3D env.fromCollection(Seq(("a", "x"), ("z", "m"))) > a.coGroup(b) > .where(0, 1) // Note that field numbers start from 0. > .equalTo(0, 1) { > (left, right) =3D> 1 > } > ``` >=20 > I hope this helps. >=20 > [1]: = https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.h= tml#define-keys-for-tuples >=20 > Regards, > Chiwan Park >=20 > > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov = wrote: > > > > Hello, > > > > Another issue I have encountered is incorrect implicit resolution = (I'm using Scala 2.11.7). Here's the example (with a workaround): > > val a =3D env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > > val b =3D env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > > a.coGroup(b) > > .where(e =3D> e.f1) > > //.equalTo(e =3D> e) { //this fails to compile because equalTo = expects an implicit > > .equalTo("f1") { > > (left, right) =3D> 1 > > } > > However, the workaround does not quite work when key is a tuple (I = suspect this applies to other generic classes as well): > > val a =3D env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > > val b =3D env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > > a.coGroup(b) > > .where(e =3D> (e.f1, e.f2)) > > .equalTo(e =3D> (e.f1, e.f2))(TypeInformation.of(classOf[(String, = String)])) { (left, right) =3D> 1} // throws InvalidProgramException > > Here, I try to provide the implicit TypeInformation explicitly, but = apparently it's not compatible with the way implicit inference is done. = (TypeInformation I generate is GenericType, while = scala.Tuple2 is expected). > > > > Now, I can split this in 2 operations like below: > > val tmp =3D a.coGroup(b) > > .where(e =3D> (e.f1, e.f2)) > > .equalTo(e =3D> (e.f1, e.f2)) > > > > tmp { (left, right) =3D> 1} > > but, I would like to avoid adding clutter to my processing logic, = and it's not entirely clear to me how this would be scheduled. > > > > As an option, I can hash the hell out of my keys like that: > > a.coGroup(b) > > .where(e =3D> (e.f1, e.f2).hashCode) > > .equalTo(e =3D> (e.f1, = e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) =3D> 1} > > but that, again, adds some indirection and clutter, not mentioning = the hassle of dealing with collisions (which can be alleviated by using = fancy hashes, but I'd like to avoid that). > > > > Any insights on what is the way to go here are highly appreciated. > > > > Thanks, > > Timur >=20 >=20