flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Fwd: External Talk: Apache Flink - Speakers: Kostas Tzoumas (CEO dataArtisans), Stephan Ewen (CTO dataArtisans)
Date Tue, 07 Apr 2015 13:19:23 GMT
I don't know whether my ideas are much better than the cartesian product
solution. As a matter of fact at some point we have to replicate the data
to be able to compute the correlations in parallel. There are basically 3
ideas I had:

1. Broadcast U and V and simply compute the correlation for different
shifts in a mapper. This only works if the time series data is small enough
to be kept in memory of a task manager.
2. Create for each shift and element a join key, join the elements and
reduce them to obtain the final result. This has a communication complexity
of (n^2+n)/2 which is asymptotically the same as the cartesian product
solution. But this solution will probably run for arbitrarily large
correlation intervals.

So let's say we have (u1, u2, u3) and (v1, v2, v3): Then we would first
create the join keys: (1, 1, u1), (2, 1, u1), (3, 1, u1), (1, 2, u2), (2,
2, u2), (1, 3, u3), (1, 1, v1), (1, 2, v2), (2, 1, v2), (1, 3, v3), (2, 2,
v3), (3, 1, v3). Then join on the first and second field and compute u*v
with the first field as key. Reducing on this field let's you then compute
the correlation.

3. Group the elements of each subinterval with respect to their shift value
and join both grouped subintervals. Then compute the correlation. This
again only works if the grouped data can be kept on the heap of the task
manager.

On Tue, Apr 7, 2015 at 1:29 PM, Sebastian <ssc@apache.org> wrote:

> How large are the individual time series?
>
> -s
>
> On 07.04.2015 12:42, Kostas Tzoumas wrote:
>
>> Hi everyone,
>>
>> I'm forwarding a private conversation to the list with Mats' approval.
>>
>> The problem is how to compute correlation between time series in Flink.
>> We have two time series, U and V, and need to compute 1000 correlation
>> measures between the series, each measure shifts one series by one more
>> item: corr(U[0:N], V[n:N+n]) for n=0 to n=1000.
>>
>> Any ideas on how one can do that without a Cartesian product?
>>
>> Best,
>> Kostas
>>
>> ---------- Forwarded message ----------
>> From: *Mats Zachrison* <mats.zachrison@ericsson.com
>> <mailto:mats.zachrison@ericsson.com>>
>> Date: Tue, Mar 31, 2015 at 9:21 AM
>> Subject:
>> To: Kostas Tzoumas <kostas@data-artisans.com
>> <mailto:kostas@data-artisans.com>>, Stefan Avesand
>> <stefan.avesand@ericsson.com <mailto:stefan.avesand@ericsson.com>>
>> Cc: "stephan@data-artisans.com <mailto:stephan@data-artisans.com>"
>> <stephan@data-artisans.com <mailto:stephan@data-artisans.com>>
>>
>> As Stefan said, what I’m trying to achieve is basically a nice way to do
>> a correlation between two large time series. Since I’m looking for an
>> optimal delay between the two series, I’d like to delay one of the
>> series x observations when doing the correlation, and step x from 1 to
>> 1000.____
>>
>> __ __
>>
>> Some pseudo code:____
>>
>> __ __
>>
>>    For (x = 1 to 1000)____
>>
>>        Shift Series A ‘x-1’ steps____
>>
>>        Correlation[x] = Correlate(Series A and Series B)____
>>
>>    End For____
>>
>> __ __
>>
>> In R, using cor() and apply(), this could look like:____
>>
>> __ __
>>
>>    shift <- as.array(c(1:1000))____
>>
>>    corrAB <- apply(shift, 1, function(x) cor(data[x:nrow(data),
>> ]$ColumnA, data[1:(nrow(data) - (x - 1)), ]$ColumnB))____
>>
>> __ __
>>
>> __ __
>>
>> Since this basically is 1000 independent correlation calculations, it is
>> fairly easy to parallelize. Here is an R example using foreach() and
>> package doParallel:____
>>
>> __ __
>>
>>    cl <- makeCluster(3)____
>>
>>    registerDoParallel(cl)____
>>
>>    corrAB <- foreach(step = c(1:1000)) %dopar% {____
>>
>>          corrAB <- cor(data[step:nrow(data), ]$ColumnA,
>> data[1:(nrow(data) - (step - 1)), ]$ColumnB)____
>>
>>    }____
>>
>>    stopCluster(cl)____
>>
>> __ __
>>
>> So I guess the question is – how to do this in a Flink environment? Do
>> we have to define how to parallelize the algorithm, or can the cluster
>> take care of that for us?____
>>
>> __ __
>>
>> And of course this is most interesting on a generic level – given the
>> environment of a multi-core or –processor setup running Flink, how hard
>> is it to take advantage of all the clock cycles? Do we have to split the
>> algorithm, and data, and distribute the processing, or can the system do
>> much of that for us?____
>>
>> __
>>
>>
>> __ __
>>
>> __
>>
>>
>>

Mime
View raw message