flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timur Fayruzov <timur.fairu...@gmail.com>
Subject Re: Implicit inference of TypeInformation for join keys
Date Thu, 31 Mar 2016 19:16:47 GMT
I'm very content with an extra `apply`, it's much cleaner than any of my
initial solutions.

On Thu, Mar 31, 2016 at 2:18 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> I'm afraid there is no way around having that extra ".apply" because the
> Scala compiler will get confused with the additional implicit parameter.
> It's a bit ugly, though ...
>
> On Wed, 30 Mar 2016 at 18:34 Timur Fayruzov <timur.fairuzov@gmail.com>
> wrote:
>
>> Actually, there is an even easier solution (which I saw in your reply to
>> my other question):
>> ```
>> a.coGroup(b)
>>   .where(e => (e.f1, e.f2))
>>   .equalTo(e => (e.f1, e.f2)).apply {
>>
>>     (left, right) => 1
>>   }.print()
>> ```
>> pretty much does what I want. Explicit `apply` gives a hint that a
>> compiler was missing before. Nevertheless, `createTypeInformation` works
>> too, thanks for sharing!
>>
>> Thanks,
>> Timur
>>
>> On Wed, Mar 30, 2016 at 9:15 AM, Chiwan Park <chiwanpark@apache.org>
>> wrote:
>>
>>> Hi Timur,
>>>
>>> You have to use `createTypeInfomation` method in `org.apache.flink.api`
>>> package to create TypeInformation object for Scala-specific objects such as
>>> case classes, tuples, eithers, options. For example:
>>>
>>> ```
>>> import org.apache.flink.api.scala._ // to import package object
>>>
>>> val a: DataSet[Thing] = …
>>> val b: DataSet[Thing] = …
>>>
>>> a.coGroup(b)
>>>   .where(e => (e.f1, e.f2))
>>>   .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) {
>>>     (left, right) => 1
>>>   }.print()
>>> ```
>>>
>>> Note that Flink creates internally copied 2-tuples consisted of
>>> (extracted key by KeySelector, original value). So there is some
>>> performance decrease when you are using KeySelector.
>>>
>>> Regards,
>>> Chiwan Park
>>>
>>> > On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <timur.fairuzov@gmail.com>
>>> wrote:
>>> >
>>> > Thank you Chiwan! Yes, I understand that there are workarounds that
>>> don't use function argument (and thus do not require implicit arguments). I
>>> try to avoid positional and string-based keys because there is no compiler
>>> guarantees when you refactor or accidentally change the underlying case
>>> classes. Providing a function is the cleanest solution (and arguably is the
>>> most readable) so it'd be great to make it work.
>>> >
>>> > BTW, TypeInformation.of has an implementation that takes TypeHint (
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java)
>>> which, according to documentation, is supposed to be used for generic
>>> classes, but using it still leads to the same exception.
>>> >
>>> > Thanks,
>>> > Timur
>>> >
>>> >
>>> > On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanpark@apache.org>
>>> wrote:
>>> > Hi Timur,
>>> >
>>> > You can use a composite key [1] to compare keys consisting of multiple
>>> fields. For example:
>>> >
>>> > ```
>>> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
>>> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
>>> > a.coGroup(b)
>>> >   .where(“f1”, “f2”) // Flink compares the values of f1 first, and
>>> compares the values of f2 if values of f1 are same.
>>> >   .equalTo(“f1”, “f2”) { // Note that you must specify same number
of
>>> keys
>>> >     (left, right) => 1
>>> >   }
>>> > ```
>>> >
>>> > Composite key can be applied to Scala tuple also:
>>> >
>>> > ```
>>> > val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
>>> > val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
>>> > a.coGroup(b)
>>> >   .where(0, 1) // Note that field numbers start from 0.
>>> >   .equalTo(0, 1) {
>>> >     (left, right) => 1
>>> >   }
>>> > ```
>>> >
>>> > I hope this helps.
>>> >
>>> > [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples
>>> >
>>> > Regards,
>>> > Chiwan Park
>>> >
>>> > > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <
>>> timur.fairuzov@gmail.com> wrote:
>>> > >
>>> > > Hello,
>>> > >
>>> > > Another issue I have encountered is incorrect implicit resolution
>>> (I'm using Scala 2.11.7). Here's the example (with a workaround):
>>> > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
>>> > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
>>> > > a.coGroup(b)
>>> > >   .where(e => e.f1)
>>> > >   //.equalTo(e => e) { //this fails to compile because equalTo
>>> expects an implicit
>>> > >   .equalTo("f1") {
>>> > >     (left, right) => 1
>>> > >   }
>>> > > However, the workaround does not quite work when key is a tuple (I
>>> suspect this applies to other generic classes as well):
>>> > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
>>> > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
>>> > > a.coGroup(b)
>>> > >   .where(e => (e.f1, e.f2))
>>> > >   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String,
>>> String)])) { (left, right) => 1} // throws InvalidProgramException
>>> > > Here, I try to provide the implicit TypeInformation explicitly, but
>>> apparently it's not compatible with the way implicit inference is done.
>>> (TypeInformation I generate is GenericType<scala.Tuple2>, while
>>> scala.Tuple2<String, String> is expected).
>>> > >
>>> > > Now, I can split this in 2 operations like below:
>>> > > val tmp = a.coGroup(b)
>>> > >   .where(e => (e.f1, e.f2))
>>> > >   .equalTo(e => (e.f1, e.f2))
>>> > >
>>> > > tmp { (left, right) => 1}
>>> > > but, I would like to avoid adding clutter to my processing logic,
>>> and it's not entirely clear to me how this would be scheduled.
>>> > >
>>> > > As an option, I can hash the hell out of my keys like that:
>>> > > a.coGroup(b)
>>> > >   .where(e => (e.f1, e.f2).hashCode)
>>> > >   .equalTo(e => (e.f1,
>>> e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1}
>>> > > but that, again, adds some indirection and clutter, not mentioning
>>> the hassle of dealing with collisions (which can be alleviated by using
>>> fancy hashes, but I'd like to avoid that).
>>> > >
>>> > > Any insights on what is the way to go here are highly appreciated.
>>> > >
>>> > > Thanks,
>>> > > Timur
>>> >
>>> >
>>>
>>>
>>

Mime
View raw message