flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chiwan Park <chiwanp...@apache.org>
Subject Re: Implicit inference of TypeInformation for join keys
Date Wed, 30 Mar 2016 16:15:13 GMT
Hi Timur,

You have to use `createTypeInfomation` method in `org.apache.flink.api` package to create
TypeInformation object for Scala-specific objects such as case classes, tuples, eithers, options.
For example:

```
import org.apache.flink.api.scala._ // to import package object

val a: DataSet[Thing] = …
val b: DataSet[Thing] = …

a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) {
    (left, right) => 1
  }.print()
```

Note that Flink creates internally copied 2-tuples consisted of (extracted key by KeySelector,
original value). So there is some performance decrease when you are using KeySelector.

Regards,
Chiwan Park

> On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <timur.fairuzov@gmail.com> wrote:
> 
> Thank you Chiwan! Yes, I understand that there are workarounds that don't use function
argument (and thus do not require implicit arguments). I try to avoid positional and string-based
keys because there is no compiler guarantees when you refactor or accidentally change the
underlying case classes. Providing a function is the cleanest solution (and arguably is the
most readable) so it'd be great to make it work.
> 
> BTW, TypeInformation.of has an implementation that takes TypeHint (https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java)
which, according to documentation, is supposed to be used for generic classes, but using it
still leads to the same exception.
> 
> Thanks,
> Timur
> 
> 
> On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanpark@apache.org> wrote:
> Hi Timur,
> 
> You can use a composite key [1] to compare keys consisting of multiple fields. For example:
> 
> ```
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(“f1”, “f2”) // Flink compares the values of f1 first, and compares the
values of f2 if values of f1 are same.
>   .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys
>     (left, right) => 1
>   }
> ```
> 
> Composite key can be applied to Scala tuple also:
> 
> ```
> val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
> val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
> a.coGroup(b)
>   .where(0, 1) // Note that field numbers start from 0.
>   .equalTo(0, 1) {
>     (left, right) => 1
>   }
> ```
> 
> I hope this helps.
> 
> [1]: https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples
> 
> Regards,
> Chiwan Park
> 
> > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairuzov@gmail.com> wrote:
> >
> > Hello,
> >
> > Another issue I have encountered is incorrect implicit resolution (I'm using Scala
2.11.7). Here's the example (with a workaround):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => e.f1)
> >   //.equalTo(e => e) { //this fails to compile because equalTo expects an implicit
> >   .equalTo("f1") {
> >     (left, right) => 1
> >   }
> > However, the workaround does not quite work when key is a tuple (I suspect this
applies to other generic classes as well):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, String)]))
{ (left, right) => 1} // throws InvalidProgramException
> > Here, I try to provide the implicit TypeInformation explicitly, but apparently it's
not compatible with the way implicit inference is done. (TypeInformation I generate is GenericType<scala.Tuple2>,
while scala.Tuple2<String, String> is expected).
> >
> > Now, I can split this in 2 operations like below:
> > val tmp = a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))
> >
> > tmp { (left, right) => 1}
> > but, I would like to avoid adding clutter to my processing logic, and it's not entirely
clear to me how this would be scheduled.
> >
> > As an option, I can hash the hell out of my keys like that:
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2).hashCode)
> >   .equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left,
right) => 1}
> > but that, again, adds some indirection and clutter, not mentioning the hassle of
dealing with collisions (which can be alleviated by using fancy hashes, but I'd like to avoid
that).
> >
> > Any insights on what is the way to go here are highly appreciated.
> >
> > Thanks,
> > Timur
> 
> 


Mime
View raw message