Hi everyone,

I'm forwarding a priva= te conversation to the list with Mats' approval.

The problem is how to compute correlation between time series in Flink. = We have two time series, U and V, and need to compute 1000 correlation meas= ures between the series, each measure shifts one series by one more item:= =C2=A0corr(U[0:N], V[n:N+n]) for n=3D0 to n=3D1000.

Any ideas on how one can do that without a Cartesian product?
<= br>
Best,
Kostas
<= div dir=3D"ltr">
---------- Forwarded message= ----------
From: Mats Zachrison <mats.zachrison@ericsson.com>
Date: Tue, Mar 31, 201= 5 at 9:21 AM
Subject:=C2=A0
To: Kostas Tzoumas <kostas@data-artisans.com>= ;, Stefan Avesand <stefan.avesand@ericsson.com>
Cc: "stephan@data-artisans.com<= /a>" <stephan@data-artisans.com>
=C2=A0

As Stefan said, what I=E2=80=99m trying to a= chieve is basically a nice way to do a correlation between two large time s= eries. Since I=E2=80=99m looking for an optimal delay between the two series, I=E2=80=99d like to delay one of the series x obse= rvations when doing the correlation, and step x from 1 to 1000.

=C2=A0

Some pseudo code:

=C2=A0

=C2=A0 For (x =3D 1 to 1000)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Shift Seri= es A =E2=80=98x-1=E2=80=99 steps

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Correlatio= n[x] =3D Correlate(Series A and Series B)

=C2=A0 End For

=C2=A0

In R, using cor() and apply(), this could lo= ok like:

=C2=A0

=C2=A0 shift <- as.array(c(1:1000))=

=C2=A0 corrAB <- apply(shift, 1, funct= ion(x) cor(data[x:nrow(data), ]\$ColumnA, data[1:(nrow(data) - (x - 1)), ]\$C= olumnB))

=C2=A0

=C2=A0

Since this basically is 1000 independent cor= relation calculations, it is fairly easy to parallelize. Here is an R examp= le using foreach() and package doParallel:

=C2=A0

=C2=A0 cl <- makeCluster(3)<= /u>

=C2=A0 registerDoParallel(cl)

=C2=A0 corrAB <- foreach(step =3D c(1:= 1000)) %dopar% {

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0 corrAB <- cor(data[step:nrow(data), ]\$ColumnA, data[1:(nrow(data) - = (step - 1)), ]\$ColumnB)

=C2=A0 }

=C2=A0 stopCluster(cl)

=C2=A0

So I guess the question is =E2=80=93 how to = do this in a Flink environment? Do we have to define how to parallelize the= algorithm, or can the cluster take care of that for us?

=C2=A0

And of course this is most interesting on a = generic level =E2=80=93 given the environment of a multi-core or =E2=80=93p= rocessor setup running Flink, how hard is it to take advantage of all the clock cycles? Do we have to split the algorithm, and = data, and distribute the processing, or can the system do much of that for = us?

=C2=A0=C2=A0

=C2=A0

=C2=A0

=C2=A0

--001a113a4cfa0bf2410513201240--