Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6D157185D2 for ; Mon, 4 May 2015 12:56:43 +0000 (UTC) Received: (qmail 59665 invoked by uid 500); 4 May 2015 12:56:43 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 59598 invoked by uid 500); 4 May 2015 12:56:43 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 59588 invoked by uid 99); 4 May 2015 12:56:43 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 May 2015 12:56:43 +0000 X-ASF-Spam-Status: No, hits=2.2 required=5.0 tests=HTML_MESSAGE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: message received from 54.164.171.186 which is an MX secondary for user@flink.apache.org) Received: from [54.164.171.186] (HELO mx1-us-east.apache.org) (54.164.171.186) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 May 2015 12:56:38 +0000 Received: from mail-ie0-f170.google.com (mail-ie0-f170.google.com [209.85.223.170]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 8F09342ACD for ; Mon, 4 May 2015 12:56:17 +0000 (UTC) Received: by iejt8 with SMTP id t8so128667666iej.2 for ; Mon, 04 May 2015 05:54:47 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=NIm579758YvkqHHO/YBXmdAVd09tJZ/nI1RaLRzr3h8=; b=QMgxbcvpgfXyUpxYCDPsoIxiZ29InpsqhB5+W3+kBR7fsrI1olAilbb7pyh25gc6Pn zIbU3wvqnkLChf5QswAZruXAM0hxOHVg/G/MdvmjNzCtyJGsawPkbVfgNC9VZSu5rrlK tBoEpywXHEFn/Q7bDlbvfhUZEIObkd8vBEIuGln2qjfAKr0VymY0vH+Xoof78uMlJcQ7 RkRq52zZeG8siSwWYN6l/ecG8AwgvocaagPuguSCt2BhISMv2l528ExUDF0WSONwQ8hT Xr9qNRiRdsKY0c/5uHkmiR1oxtzbuk2eOW5A5NYOJ9TYNDMvMD+eY2bm1qqr/ewDqdEF Zvvg== MIME-Version: 1.0 X-Received: by 10.50.73.198 with SMTP id n6mr12782397igv.32.1430744087159; Mon, 04 May 2015 05:54:47 -0700 (PDT) Sender: ewenstephan@gmail.com Received: by 10.64.25.108 with HTTP; Mon, 4 May 2015 05:54:47 -0700 (PDT) In-Reply-To: References: <5547414A.1030607@informatik.hu-berlin.de> Date: Mon, 4 May 2015 14:54:47 +0200 X-Google-Sender-Auth: xvEVC5yMgtP5QoSV8mwiVZ_KW_A Message-ID: Subject: Re: Best way to join with inequalities (historical data) From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=089e011839a614679805154111e4 X-Virus-Checked: Checked by ClamAV on apache.org --089e011839a614679805154111e4 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable When you use cross() it is always goot to use crossWithLarge or crossWithTiny to tell the system which side is small. It can not always infer that automagically at this point. If you have an optimized structure for the lookups, go with a broadcast variable and a map() function. On Mon, May 4, 2015 at 1:40 PM, LINZ, Arnaud wrote: > Hi, > > Thanks. The use case I have right now does not require too much magic ; m= y > historical data set is small enough to fit in RAM, I'll spread it over ea= ch > node and use a simple mapping with a log(n) look up. It was more a > theorical question. > If my dataset becomes too large, I may use some hashing techniques (for > instance at day level) and cut the intervals at hash frontiers by > duplicating the row to prevent overlapping. > > Arnaud > > > > > -----Message d'origine----- > De : Matthias J. Sax [mailto:mjsax@informatik.hu-berlin.de] > Envoy=C3=A9 : lundi 4 mai 2015 11:52 > =C3=80 : user@flink.apache.org > Objet : Re: Best way to join with inequalities (historical data) > > Hi, > > there is no other system support to express this join. > > However, you could perform some "hand wired" optimization by partitioning > your input data into distinct intervals. It might be tricky though. > Especially, if the time-ranges in your "range-key" dataset are overlappin= g > everywhere (-> data replication necessary for overlapping parts). > > But it might be worth the effort if you can't get the job done using > cross-product. How large are your data sets? What hardware are you using? > > > -Matthias > > > On 05/04/2015 10:47 AM, LINZ, Arnaud wrote: > > Hello, > > > > > > > > I was wondering how to join large data sets on inequalities. > > > > > > > > Let say I have a data set whose =E2=80=9Ckeys=E2=80=9D are two timestam= ps (start time > > & end time of validity) and value is a label : > > > > *final*DataSet> historical=3D =E2=80= =A6; > > > > > > > > I also have events, with an event name and a timestamp : > > > > *final*DataSet> events=3D =E2=80=A6; > > > > > > > > I want to join my events with my historical data to get the =E2=80=9Cac= tive=E2=80=9D > > label for the time of the event. > > > > The simple way is to use a cross product + a filter : > > > > > > > > events.cross(historical).filter((crossedRow) -> { > > > > *return*(crossedRow.f0.f1>=3D crossedRow.f1.f0) && > > (crossedRow.f0.f1<=3D crossedRow.f1.f1); > > > > }) > > > > > > > > But that=E2=80=99s not efficient with 2 big data sets=E2=80=A6 > > > > > > > > How would you code that ? > > > > > > > > Greetings, > > > > Arnaud > > > > > > > > > > > > > > > > > > > > > > ---------------------------------------------------------------------- > > -- > > > > L'int=C3=A9grit=C3=A9 de ce message n'=C3=A9tant pas assur=C3=A9e sur i= nternet, la soci=C3=A9t=C3=A9 > > exp=C3=A9ditrice ne peut =C3=AAtre tenue responsable de son contenu ni = de ses > > pi=C3=A8ces jointes. Toute utilisation ou diffusion non autoris=C3=A9e = est > > interdite. Si vous n'=C3=AAtes pas destinataire de ce message, merci de= le > > d=C3=A9truire et d'avertir l'exp=C3=A9diteur. > > > > The integrity of this message cannot be guaranteed on the Internet. > > The company that sent this message cannot therefore be held liable for > > its content nor attachments. Any unauthorized use or dissemination is > > prohibited. If you are not the intended recipient of this message, > > then please delete it and notify the sender. > > --089e011839a614679805154111e4 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
When you use cross() it is always goot to use crossWithLar= ge or crossWithTiny to tell the system which side is small. It can not alwa= ys infer that automagically at this point.

If you have a= n optimized structure for the lookups, go with a broadcast variable and a m= ap() function.

On Mon, May 4, 2015 at 1:40 PM, LINZ, Arnaud <= ;ALINZ@bouygu= estelecom.fr> wrote:
Hi,
Thanks. The use case I have right now does not require too much magic ; my = historical data set is small enough to fit in RAM, I'll spread it over = each node and use a simple mapping with a log(n) look up. It was more a the= orical question.
If my dataset becomes too large, I may use some hashing techniques (for ins= tance at day level) and cut the intervals at hash frontiers by duplicating = the row to prevent overlapping.

Arnaud




-----Message d'origine-----
De=C2=A0: Matthias J. Sax [mailto:mjsax@informatik.hu-berlin.de]
Envoy=C3=A9=C2=A0: lundi 4 mai 2015 11:52
=C3=80=C2=A0: user@flink.apache.or= g
Objet=C2=A0: Re: Best way to join with inequalities (historical data)

Hi,

there is no other system support to express this join.

However, you could perform some "hand wired" optimization by part= itioning your input data into distinct intervals. It might be tricky though= . Especially, if the time-ranges in your "range-key" dataset are = overlapping everywhere (-> data replication necessary for overlapping pa= rts).

But it might be worth the effort if you can't get the job done using cr= oss-product. How large are your data sets? What hardware are you using?


-Matthias


On 05/04/2015 10:47 AM, LINZ, Arnaud wrote:
> Hello,
>
>
>
> I was wondering how to join large data sets on inequalities.
>
>
>
> Let say I have a data set whose =E2=80=9Ckeys=E2=80=9D are two timesta= mps (start time
> & end time of validity) and value is a label :
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0*final*DataSet<Tuple3<Long, Lon= g, String>> historical=3D =E2=80=A6;
>
>
>
> I also have events, with an event name and a timestamp :
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0*final*DataSet<Tuple2<String, L= ong>> events=3D =E2=80=A6;
>
>
>
> I want to join my events with my historical data to get the =E2=80=9Ca= ctive=E2=80=9D
> label for the time of the event.
>
> The simple way is to use a cross product + a filter :
>
>
>
> events.cross(historical).filter((crossedRow) -> {
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0*return*(crossedRow.f0.= f1>=3D crossedRow.f1.f0) &&
> (crossedRow.f0.f1<=3D crossedRow.f1.f1);
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0})
>
>
>
> But that=E2=80=99s not efficient with 2 big data sets=E2=80=A6
>
>
>
> How would you code that ?
>
>
>
> Greetings,
>
> Arnaud
>
>
>
>
>
>
>
>
>
>
> ----------------------------------------------------------------------=
> --
>
> L'int=C3=A9grit=C3=A9 de ce message n'=C3=A9tant pas assur=C3= =A9e sur internet, la soci=C3=A9t=C3=A9
> exp=C3=A9ditrice ne peut =C3=AAtre tenue responsable de son contenu ni= de ses
> pi=C3=A8ces jointes. Toute utilisation ou diffusion non autoris=C3=A9e= est
> interdite. Si vous n'=C3=AAtes pas destinataire de ce message, mer= ci de le
> d=C3=A9truire et d'avertir l'exp=C3=A9diteur.
>
> The integrity of this message cannot be guaranteed on the Internet. > The company that sent this message cannot therefore be held liable for=
> its content nor attachments. Any unauthorized use or dissemination is<= br> > prohibited. If you are not the intended recipient of this message,
> then please delete it and notify the sender.


--089e011839a614679805154111e4--