flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: How to make a generic key for groupBy
Date Mon, 11 May 2015 16:31:50 GMT
My pleasure, happy to hear that it solved your problem.

I was wondering: Do more people have a similar structure in their types and
utilities?
If yes, it may make sense to add a generic version of these utilities to
the Fink codebase.

On Fri, May 1, 2015 at 11:51 AM, LINZ, Arnaud <ALINZ@bouyguestelecom.fr>
wrote:

>  Hi Stephen,
>
> Thanks a lot, that solves my problem. I am truly amazed and grateful by
> the time you spent to help me on this subject.
>
> Greetings,
> Arnaud
>  ------------------------------
> De : Stephan Ewen <sewen@apache.org>
> Envoyé : ‎29/‎04/‎2015 17:19
>
> À : user@flink.apache.org
> Objet : Re: How to make a generic key for groupBy
>
>  Hey Arnaud!
>
>  I have made a quick sample implementation of how you can very
> efficiently support generic keys, like yours. I put the code in this
> repository: https://github.com/StephanEwen/flink-generic-keys
>
>
>  It implements a special key selector. You can use that to do what you
> used to do, and it internally does a bit of magic to expose type
> information to the Flink pre-flight phase:
>
>
> https://github.com/StephanEwen/flink-generic-keys/blob/master/src/main/java/com/dataartisans/flink/TupleKeySelector.java
>
>
>
>  Here is a quick intro to what this does:
>
>    - It exposes keys as Flink tuples, by wrapping your Keys in a Tuple in
> a generic way. Tuples are the fastest keys in Flink, because they support
> the best "on-binary-data" operations.
>
>    - It figures out which types are going to be in the tuples (and in
> your generic keys), by analyzing the Method signature of your concrete
> "getKey()" method implementation.
>
>
>  The requirement for the second part is that your concrete types declare
> the concrete key in their signature. Below is an example. Note that the
> "SomeType" class does not declare "Key" as the return type of "getKey()",
> but very specifically "Key2<String, Integer>". That way, the key selector
> can pick up the types (String and Integer) and expose it to Flink, so that
> Flink can generate the key comparators to do efficient binary operations
> for the keys.
>
>
>  public abstract class TypeBase {
> public abstract Key getKey();
> }
>
>  public class SomeType extends TypeBase {
>  public String someString;
> public int anotherInteger;
> public Date aDate;
>
>  @Override
> public Key2<String, Integer> getKey() {
> return new Key2<String, Integer>(someString, anotherInteger);
> }
> }
>
>
>  The good thing about exposing this information to Flink in the
> pre-flight phase (before the job runs in parallel) is that it pre-checks
> many things to prevent most memory and serialization surprises at runtime.
>
>  Let me know if you have questions!
>
>  Greetings,
> Stephan
>
>
> On Mon, Apr 27, 2015 at 6:38 PM, Stephan Ewen <sewen@apache.org> wrote:
>
>> Hi!
>>
>>  I have some ideas, let me see if I can make them concrete until
>> tomorrow...
>>
>>  Greetings,
>> Stephan
>>
>>
>> On Mon, Apr 27, 2015 at 5:29 PM, LINZ, Arnaud <ALINZ@bouyguestelecom.fr>
>> wrote:
>>
>>>  Hi,
>>>
>>> I see. My Key class is an abstract class, which subclasses are Key1<?>,
>>> Key2<?,?> etc, so it’s very like a tuple. It is heavily used in
>>> “non-distributed” hash maps once the dataset is reduced to fit on a single
>>> JVM.
>>>
>>> It exposes the common contract that I need (such as getHeadKey(),
>>> getLastl(), or makeKey(Key,Object)) to “navigate” in the key space, and a
>>> cached hash code to make hash maps faster. My generic algorithms do not
>>> need to know how many fields are exposed in the Key, but they need to be
>>> able to construct another key from two keys.
>>>
>>>
>>>
>>> Arnaud
>>>
>>>
>>>
>>> *De :* ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] *De la part
>>> de* Stephan Ewen
>>> *Envoyé :* vendredi 24 avril 2015 11:14
>>> *À :* user@flink.apache.org
>>> *Objet :* Re: How to make a generic key for groupBy
>>>
>>>
>>>
>>> Hi Arnaud!
>>>
>>>
>>>
>>> Thank you for the warm words! Let's find a good way to get this to
>>> work...
>>>
>>>
>>>
>>> As a bit of background:
>>>
>>> In Flink, the API needs to now a bit about the types that go through the
>>> functions, because Flink pre-generates and configures serializers, and
>>> validates that things fit together.
>>>
>>>
>>>
>>> It is also important that keys are exposed rather specifically, because
>>> Flink internally tries to work on serialized data (that makes it in-memory
>>> operations predictable and robust).
>>>
>>>
>>>
>>> If you expose a key as a "String", or "long" or "double", then Flink
>>> knows how to work on it in a binary fashion.
>>>
>>> Also, if you expose a key as a POJO, then Flink interprets the key as a
>>> combination of the fields, and can again work on the serialized data.
>>>
>>>
>>>
>>> If you only expose "Comparable" (which is the bare minimum for a key),
>>> you experience performance degradation (most notably for sorts), because
>>> every key operation involves serialization and deserialization.
>>>
>>>
>>>
>>> So the goal would be to expose the key properly. We can always hint to
>>> the API what the key type is, precisely for the cases where the inference
>>> cannot do it.
>>>
>>>   - To understand things a bit better: What is your "Key" type? Is it an
>>> abstract class, an interface, a generic parameter?
>>>
>>>
>>>
>>>
>>>
>>> Greetings,
>>>
>>> Stephan
>>>
>>>
>>>
>>>
>>>
>>> FYI: In Scala, this works actually quite a bit easier, since Scala does
>>> preserve generic types. In Java, we built a lot of reflection tooling, but
>>> there are cases where it is impossible to infer the types via reflection,
>>> like yours.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Apr 23, 2015 at 6:35 PM, Soumitra Kumar <
>>> kumar.soumitra@gmail.com> wrote:
>>>
>>>  Will you elaborate on your use case? It would help to find out where
>>> Flink shines. IMO, its a great project, but needs more differentiation from
>>> Spark.
>>>
>>>
>>>
>>> On Thu, Apr 23, 2015 at 7:25 AM, LINZ, Arnaud <ALINZ@bouyguestelecom.fr>
>>> wrote:
>>>
>>>  Hello,
>>>
>>>
>>>
>>> After a quite successful benchmark yesterday (Flink being about twice
>>> faster than Spark on my use cases), I’ve turned instantly from spark-fan to
>>> flink-fan – great job, committers!
>>>
>>> So I’ve decided to port my existing Spark tools to Flink. Happily, most
>>> of the difficulty was renaming classes, packages and variables with “spark”
>>> in them to something more neutral J
>>>
>>>
>>>
>>> However there is one easy thing in Spark I’m still wondering how to do
>>> in Flink : generic keys.
>>>
>>>
>>>
>>> I’m trying to make a framework on which my applications are built. That
>>> framework thus manipulate “generic types” representing the data, inheriting
>>> from an abstract class with a common contract, let’s call it “Bean”.
>>>
>>>
>>>
>>> Among other things Bean exposes an abstract method
>>>
>>> *public* Key getKey();
>>>
>>>
>>>
>>> Key being one of my core types used in several java algorithms.
>>>
>>>
>>>
>>> Let’s say I have the class :
>>>
>>> *public* *class* Framework<T *extends* Bean> *implements* Serializable
{
>>>
>>>
>>>
>>> *public *DataSet<T> doCoolStuff(*final* DataSet<T> inputDataset)
{
>>>
>>>         // Group lines according to a key
>>>
>>>         *final* UnsortedGrouping<YT> groupe = inputDataset.groupBy(*new*
>>> KeySelector<T, Key>() {
>>>
>>>             @Override
>>>
>>>             *public* Key getKey(T record)  {
>>>
>>>                 *return* record.getKey();
>>>
>>>             }
>>>
>>>         });
>>>
>>>              (…)
>>>
>>>        }
>>>
>>> }
>>>
>>>
>>>
>>> With Spark, a mapToPair works fine because all I have to do is
>>> implements correctly hashCode() and equals() on my Key type.
>>>
>>> With Flink, Key is not recognized as a POJO object (well it is not) and
>>> that does not work.
>>>
>>>
>>>
>>> I have tried to expose something like *public* Tuple getKeyAsTuple(); in
>>> Key but Flink does not accept generic Tuples. I’ve tried to
>>> parameterize my Tuple but Flink does not know how to infer
>>>
>>> the generic type value.
>>>
>>>
>>>
>>> So I’m wondering what is the best way to implement it.
>>>
>>> For now I have exposed something like *public* String getKeyAsString(); and
>>> turned my generic treatment into :
>>>
>>> *final* UnsortedGrouping<YT> groupe = inputDataset.groupBy(*new*
>>> KeySelector<T, String>() {
>>>
>>>             @Override
>>>
>>>             *public* String getKey(T record)  {
>>>
>>>                 *return* record.getKey().getKeyAsString();
>>>
>>>             }
>>>
>>>         });
>>>
>>> But that “ASCII” representation is suboptimal.
>>>
>>>
>>>
>>> I thought of passing a key to tuple conversion lambda upon creation of
>>> the Framework class but that would be boiler-plate code on the user’s end,
>>> which I’m not fond of.
>>>
>>>
>>>
>>> So my questions are :
>>>
>>> -          Is there a smarter way to do this ?
>>>
>>> -          What kind of objects can be passed as a Key ? Is there an
>>> Interface to respect ?
>>>
>>> -          In the worst case, is byte[]  ok as a Key ? (I can code the
>>> serialization on the framework side…)
>>>
>>>
>>>
>>>
>>>
>>> Best regards,
>>>
>>> Arnaud
>>>
>>>
>>>
>>>
>>>  ------------------------------
>>>
>>>
>>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>>> d'avertir l'expéditeur.
>>>
>>> The integrity of this message cannot be guaranteed on the Internet. The
>>> company that sent this message cannot therefore be held liable for its
>>> content nor attachments. Any unauthorized use or dissemination is
>>> prohibited. If you are not the intended recipient of this message, then
>>> please delete it and notify the sender.
>>>
>>>
>>>
>>>
>>>
>>
>>
>

Mime
View raw message