flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From MatsZ <mats.zachri...@ericsson.com>
Subject Re: Fwd: External Talk: Apache Flink - Speakers: Kostas Tzoumas (CEO dataArtisans), Stephan Ewen (CTO dataArtisans)
Date Thu, 09 Apr 2015 10:51:57 GMT
Hi, 
Thanks for the input! The data is some NW data, measured for about 1.5 hours
and aggregated into msec, i e 5.4 M rows. Sorry, should have included that
information to begin with.
 B r /Mats 



Sebastian Schelter-2 wrote
> For some similarity/correlation measures, it is also possible to discard 
> candidate pairs early, if a threshold for the resulting correlation is 
> given. This could help to fight the quadratic nature of the problem. 
> Looking for papers on similarity search might help.
> 
> -s
> 
> On 07.04.2015 15:19, Till Rohrmann wrote:
>> I don't know whether my ideas are much better than the cartesian product
>> solution. As a matter of fact at some point we have to replicate the
>> data to be able to compute the correlations in parallel. There are
>> basically 3 ideas I had:
>>
>> 1. Broadcast U and V and simply compute the correlation for different
>> shifts in a mapper. This only works if the time series data is small
>> enough to be kept in memory of a task manager.
>> 2. Create for each shift and element a join key, join the elements and
>> reduce them to obtain the final result. This has a communication
>> complexity of (n^2+n)/2 which is asymptotically the same as the
>> cartesian product solution. But this solution will probably run for
>> arbitrarily large correlation intervals.
>>
>> So let's say we have (u1, u2, u3) and (v1, v2, v3): Then we would first
>> create the join keys: (1, 1, u1), (2, 1, u1), (3, 1, u1), (1, 2, u2),
>> (2, 2, u2), (1, 3, u3), (1, 1, v1), (1, 2, v2), (2, 1, v2), (1, 3, v3),
>> (2, 2, v3), (3, 1, v3). Then join on the first and second field and
>> compute u*v with the first field as key. Reducing on this field let's
>> you then compute the correlation.
>>
>> 3. Group the elements of each subinterval with respect to their shift
>> value and join both grouped subintervals. Then compute the correlation.
>> This again only works if the grouped data can be kept on the heap of the
>> task manager.
>>
>> On Tue, Apr 7, 2015 at 1:29 PM, Sebastian &lt;

> ssc@

> &gt; &lt;mailto:

> ssc@

> &gt;> wrote:
>>
>>     How large are the individual time series?
>>
>>     -s
>>
>>     On 07.04.2015 12:42, Kostas Tzoumas wrote:
>>
>>         Hi everyone,
>>
>>         I'm forwarding a private conversation to the list with Mats'
>>         approval.
>>
>>         The problem is how to compute correlation between time series in
>>         Flink.
>>         We have two time series, U and V, and need to compute 1000
>>         correlation
>>         measures between the series, each measure shifts one series by
>>         one more
>>         item: corr(U[0:N], V[n:N+n]) for n=0 to n=1000.
>>
>>         Any ideas on how one can do that without a Cartesian product?
>>
>>         Best,
>>         Kostas
>>
>>         ---------- Forwarded message ----------
>>         From: *Mats Zachrison* &lt;

> mats.zachrison@

> &gt;         &lt;mailto:

> mats.zachrison@

> &gt;
>>         &lt;mailto:

> mats.zachrison@

> &gt;         &lt;mailto:

> mats.zachrison@

> &gt;>>
>>         Date: Tue, Mar 31, 2015 at 9:21 AM
>>         Subject:
>>         To: Kostas Tzoumas &lt;

> kostas@

> &gt;         &lt;mailto:

> kostas@

> &gt;
>>         &lt;mailto:kostas@data-artisans.__com
> &gt;         &lt;mailto:

> kostas@

> &gt;>>, Stefan Avesand
>>         &lt;

> stefan.avesand@

> &gt;         &lt;mailto:

> stefan.avesand@

> &gt;
>>         &lt;mailto:

> stefan.avesand@

> &gt;         &lt;mailto:

> stefan.avesand@

> &gt;>>
>>         Cc: "

> stephan@

>>         &lt;mailto:

> stephan@

> &gt;
>>         &lt;mailto:stephan@data-artisans.__com
> &gt;         &lt;mailto:

> stephan@

> &gt;>"
>>         &lt;

> stephan@

>  &lt;mailto:

> stephan@

> &gt;
>>         &lt;mailto:stephan@data-artisans.__com
> &gt;         &lt;mailto:

> stephan@

> &gt;>>
>>
>>         As Stefan said, what I’m trying to achieve is basically a nice
>>         way to do
>>         a correlation between two large time series. Since I’m looking
>>         for an
>>         optimal delay between the two series, I’d like to delay one of
>> the
>>         series x observations when doing the correlation, and step x
>>         from 1 to
>>         1000.____
>>
>>         __ __
>>
>>         Some pseudo code:____
>>
>>         __ __
>>
>>             For (x = 1 to 1000)____
>>
>>                 Shift Series A ‘x-1’ steps____
>>
>>                 Correlation[x] = Correlate(Series A and Series B)____
>>
>>             End For____
>>
>>         __ __
>>
>>         In R, using cor() and apply(), this could look like:____
>>
>>         __ __
>>
>>             shift <- as.array(c(1:1000))____
>>
>>             corrAB <- apply(shift, 1, function(x) cor(data[x:nrow(data),
>>         ]$ColumnA, data[1:(nrow(data) - (x - 1)), ]$ColumnB))____
>>
>>         __ __
>>
>>         __ __
>>
>>         Since this basically is 1000 independent correlation
>>         calculations, it is
>>         fairly easy to parallelize. Here is an R example using foreach()
>> and
>>         package doParallel:____
>>
>>         __ __
>>
>>             cl <- makeCluster(3)____
>>
>>             registerDoParallel(cl)____
>>
>>             corrAB <- foreach(step = c(1:1000)) %dopar% {____
>>
>>                   corrAB <- cor(data[step:nrow(data), ]$ColumnA,
>>         data[1:(nrow(data) - (step - 1)), ]$ColumnB)____
>>
>>             }____
>>
>>             stopCluster(cl)____
>>
>>         __ __
>>
>>         So I guess the question is – how to do this in a Flink
>>         environment? Do
>>         we have to define how to parallelize the algorithm, or can the
>>         cluster
>>         take care of that for us?____
>>
>>         __ __
>>
>>         And of course this is most interesting on a generic level –
>>         given the
>>         environment of a multi-core or –processor setup running Flink,
>>         how hard
>>         is it to take advantage of all the clock cycles? Do we have to
>>         split the
>>         algorithm, and data, and distribute the processing, or can the
>>         system do
>>         much of that for us?____
>>
>>         __
>>
>>
>>         __ __
>>
>>         __
>>
>>
>>





--
View this message in context: http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-External-Talk-Apache-Flink-Speakers-Kostas-Tzoumas-CEO-dataArtisans-Stephan-Ewen-CTO-dataArtisan-tp955p975.html
Sent from the Apache Flink (Incubator) User Mailing List archive. mailing list archive at
Nabble.com.

Mime
View raw message