I don't know whether my ideas are much better than the=
cartesian product solution. As a matter of fact at some point we have to r=
eplicate the data to be able to compute the correlations in parallel. There=
are basically 3 ideas I had:

--001a114791305bf4b3051322435c--

1. Broadcast U and V and s=
imply compute the correlation for different shifts in a mapper. This only w=
orks if the time series data is small enough to be kept in memory of a task=
manager.

2. Create for each shift and element a join key, join t=
he elements and reduce them to obtain the final result. This has a communic=
ation complexity of (n^2+n)/2 which is asymptotically the same as the carte=
sian product solution. But this solution will probably run for arbitrarily =
large correlation intervals.

So let's say we h=
ave (u1, u2, u3) and (v1, v2, v3): Then we would first create the join keys=
: (1, 1, u1), (2, 1, u1), (3, 1, u1), (1, 2, u2), (2, 2, u2), (1, 3, u3), (=
1, 1, v1), (1, 2, v2), (2, 1, v2), (1, 3, v3), (2, 2, v3), (3, 1, v3). Then=
join on the first and second field and compute u*v with the first field as=
key. Reducing on this field let's you then compute the correlation.

3. Group the elements of each subinterval with respe=
ct to their shift value and join both grouped subintervals. Then compute th=
e correlation. This again only works if the grouped data can be kept on the=
heap of the task manager.

On Tue, Apr 7, 2015 at 1:29 PM, Sebastian <ssc@apache.or=
g> wrote:

How large are the= individual time series?

-s

On 07.04.2015 12:42, Kostas Tzoumas wrote:

Hi everyone,

I'm forwarding a private conversation to the list with Mats' approv= al.

The problem is how to compute correlation between time series in Flink.

We have two time series, U and V, and need to compute 1000 correlation

measures between the series, each measure shifts one series by one more

item: corr(U[0:N], V[n:N+n]) for n=3D0 to n=3D1000.

Any ideas on how one can do that without a Cartesian product?

Best,

Kostas

---------- Forwarded message ----------

From: *Mats Zachrison* <mats.zachrison@ericsson.com

<mailto:mats.zachrison@ericsson.com>>

Date: Tue, Mar 31, 2015 at 9:21 AM

Subject:

To: Kostas Tzoumas <kostas@data-artisans.com

<mailto:ko= stas@data-artisans.com>>, Stefan Avesand

<stefan= .avesand@ericsson.com <mailto:stefan.avesand@ericsson.com>>Cc: "st= ephan@data-artisans.com <mailto:stephan@data-artisans.com>"<stephan@= data-artisans.com <mailto:stephan@data-artisans.com>>

As Stefan said, what I=E2=80=99m trying to achieve is basically a nice way = to do

a correlation between two large time series. Since I=E2=80=99m looking for = an

optimal delay between the two series, I=E2=80=99d like to delay one of the<= br> series x observations when doing the correlation, and step x from 1 to

<= /span> 1000.____

__ __

Some pseudo code:____

__ __

=C2=A0 =C2=A0For (x =3D 1 to 1000)____

=C2=A0 =C2=A0 =C2=A0 =C2=A0Shift Series A =E2=80=98x-1=E2=80=99 steps____

=C2=A0 =C2=A0 =C2=A0 =C2=A0Correlation[x] =3D Correlate(Series A and Series= B)____

=C2=A0 =C2=A0End For____

__ __

In R, using cor() and apply(), this could look like:____

__ __

=C2=A0 =C2=A0shift <- as.array(c(1:1000))____

=C2=A0 =C2=A0corrAB <- apply(shift, 1, function(x) cor(data[x:nrow(data)= ,

]$ColumnA, data[1:(nrow(data) - (x - 1)), ]$ColumnB))____

__ __

__ __

Since this basically is 1000 independent correlation calculations, it is

fairly easy to parallelize. Here is an R example using foreach() and

package doParallel:____

__ __

=C2=A0 =C2=A0cl <- makeCluster(3)____

=C2=A0 =C2=A0registerDoParallel(cl)____

=C2=A0 =C2=A0corrAB <- foreach(step =3D c(1:1000)) %dopar% {____

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0corrAB <- cor(data[step:nrow(data), ]$= ColumnA,

data[1:(nrow(data) - (step - 1)), ]$ColumnB)____

=C2=A0 =C2=A0}____

=C2=A0 =C2=A0stopCluster(cl)____

__ __

So I guess the question is =E2=80=93 how to do this in a Flink environment?= Do

we have to define how to parallelize the algorithm, or can the cluster

<= /span> take care of that for us?____

__ __

And of course this is most interesting on a generic level =E2=80=93 given t= he

environment of a multi-core or =E2=80=93processor setup running Flink, how = hard

is it to take advantage of all the clock cycles? Do we have to split the

algorithm, and data, and distribute the processing, or can the system do

much of that for us?____

__

__ __

__