flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Best way to join with inequalities (historical data)
Date Mon, 04 May 2015 12:54:47 GMT
When you use cross() it is always goot to use crossWithLarge or
crossWithTiny to tell the system which side is small. It can not always
infer that automagically at this point.

If you have an optimized structure for the lookups, go with a broadcast
variable and a map() function.

On Mon, May 4, 2015 at 1:40 PM, LINZ, Arnaud <ALINZ@bouyguestelecom.fr>
wrote:

> Hi,
>
> Thanks. The use case I have right now does not require too much magic ; my
> historical data set is small enough to fit in RAM, I'll spread it over each
> node and use a simple mapping with a log(n) look up. It was more a
> theorical question.
> If my dataset becomes too large, I may use some hashing techniques (for
> instance at day level) and cut the intervals at hash frontiers by
> duplicating the row to prevent overlapping.
>
> Arnaud
>
>
>
>
> -----Message d'origine-----
> De : Matthias J. Sax [mailto:mjsax@informatik.hu-berlin.de]
> Envoyé : lundi 4 mai 2015 11:52
> À : user@flink.apache.org
> Objet : Re: Best way to join with inequalities (historical data)
>
> Hi,
>
> there is no other system support to express this join.
>
> However, you could perform some "hand wired" optimization by partitioning
> your input data into distinct intervals. It might be tricky though.
> Especially, if the time-ranges in your "range-key" dataset are overlapping
> everywhere (-> data replication necessary for overlapping parts).
>
> But it might be worth the effort if you can't get the job done using
> cross-product. How large are your data sets? What hardware are you using?
>
>
> -Matthias
>
>
> On 05/04/2015 10:47 AM, LINZ, Arnaud wrote:
> > Hello,
> >
> >
> >
> > I was wondering how to join large data sets on inequalities.
> >
> >
> >
> > Let say I have a data set whose “keys” are two timestamps (start time
> > & end time of validity) and value is a label :
> >
> >         *final*DataSet<Tuple3<Long, Long, String>> historical= …;
> >
> >
> >
> > I also have events, with an event name and a timestamp :
> >
> >         *final*DataSet<Tuple2<String, Long>> events= …;
> >
> >
> >
> > I want to join my events with my historical data to get the “active”
> > label for the time of the event.
> >
> > The simple way is to use a cross product + a filter :
> >
> >
> >
> > events.cross(historical).filter((crossedRow) -> {
> >
> >             *return*(crossedRow.f0.f1>= crossedRow.f1.f0) &&
> > (crossedRow.f0.f1<= crossedRow.f1.f1);
> >
> >         })
> >
> >
> >
> > But that’s not efficient with 2 big data sets…
> >
> >
> >
> > How would you code that ?
> >
> >
> >
> > Greetings,
> >
> > Arnaud
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > ----------------------------------------------------------------------
> > --
> >
> > L'intégrité de ce message n'étant pas assurée sur internet, la société
> > expéditrice ne peut être tenue responsable de son contenu ni de ses
> > pièces jointes. Toute utilisation ou diffusion non autorisée est
> > interdite. Si vous n'êtes pas destinataire de ce message, merci de le
> > détruire et d'avertir l'expéditeur.
> >
> > The integrity of this message cannot be guaranteed on the Internet.
> > The company that sent this message cannot therefore be held liable for
> > its content nor attachments. Any unauthorized use or dissemination is
> > prohibited. If you are not the intended recipient of this message,
> > then please delete it and notify the sender.
>
>

Mime
View raw message