Hi everyone,

The problem is how to compute correlation between time series in Flink. =
We have two time series, U and V, and need to compute 1000 correlation meas=
ures between the series, each measure shifts one series by one more item:=
=C2=A0corr(U[0:N], V[n:N+n]) for n=3D0 to n=3D1000.

I'm forwarding a priva=
te conversation to the list with Mats' approval.

<=
br>

Best,

Kostas

<=
div dir=3D"ltr">

__
__

---------- Forwarded message=
----------

From:**Mats Zachrison** <mats.zachrison@ericsson.com>

Date: Tue, Mar 31, 201= 5 at 9:21 AM

Subject:=C2=A0

To: Kostas Tzoumas <kostas@data-artisans.com>= ;, Stefan Avesand <stefan.avesand@ericsson.com>

Cc: "stephan@data-artisans.com<= /a>" <stephan@data-artisans.com>

=C2=A0

__
____
____
____
__

From:

Date: Tue, Mar 31, 201= 5 at 9:21 AM

Subject:=C2=A0

To: Kostas Tzoumas <kostas@data-artisans.com>= ;, Stefan Avesand <stefan.avesand@ericsson.com>

Cc: "stephan@data-artisans.com<= /a>" <stephan@data-artisans.com>

=C2=A0

As Stefan said, what I=E2=80=99m trying to a=
chieve is basically a nice way to do a correlation between two large time s=
eries. Since I=E2=80=99m looking for an optimal delay
between the two series, I=E2=80=99d like to delay one of the series x obse=
rvations when doing the correlation, and step x from 1 to 1000.

__=C2=A0__

Some pseudo code:

__=C2=A0__

=C2=A0 For (x =3D 1 to 1000)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Shift Seri=
es A =E2=80=98x-1=E2=80=99 steps

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Correlatio=
n[x] =3D Correlate(Series A and Series B)

=C2=A0 End For

__=C2=A0__

In R, using cor() and apply(), this could lo=
ok like:

__=C2=A0__

=C2=A0 shift <- as.array(c(1:1000))__=
__

=C2=A0 corrAB <- apply(shift, 1, funct=
ion(x) cor(data[x:nrow(data), ]$ColumnA, data[1:(nrow(data) - (x - 1)), ]$C=
olumnB))

__=C2=A0__

__=C2=A0__

Since this basically is 1000 independent cor=
relation calculations, it is fairly easy to parallelize. Here is an R examp=
le using foreach() and package doParallel:

__=C2=A0__

=C2=A0 cl <- makeCluster(3)__<=
/u>__

=C2=A0 registerDoParallel(cl)

=C2=A0 corrAB <- foreach(step =3D c(1:=
1000)) %dopar% {

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=
=A0 corrAB <- cor(data[step:nrow(data), ]$ColumnA, data[1:(nrow(data) - =
(step - 1)), ]$ColumnB)

=C2=A0 }

=C2=A0 stopCluster(cl)

__=C2=A0__

So I guess the question is =E2=80=93 how to =
do this in a Flink environment? Do we have to define how to parallelize the=
algorithm, or can the cluster take care of that
for us?

__=C2=A0__

And of course this is most interesting on a =
generic level =E2=80=93 given the environment of a multi-core or =E2=80=93p=
rocessor setup running Flink, how hard is it to take
advantage of all the clock cycles? Do we have to split the algorithm, and =
data, and distribute the processing, or can the system do much of that for =
us?

__=C2=A0=C2=A0
__

=C2=A0

__=C2=A0__

__=C2=A0__

--001a113a4cfa0bf2410513201240--