Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C712D1799A for ; Tue, 7 Apr 2015 13:40:17 +0000 (UTC) Received: (qmail 51063 invoked by uid 500); 7 Apr 2015 13:40:08 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 50988 invoked by uid 500); 7 Apr 2015 13:40:08 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 50972 invoked by uid 99); 7 Apr 2015 13:40:08 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Apr 2015 13:40:07 +0000 X-ASF-Spam-Status: No, hits=-0.7 required=5.0 tests=RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of ssc.open@googlemail.com designates 209.85.212.178 as permitted sender) Received: from [209.85.212.178] (HELO mail-wi0-f178.google.com) (209.85.212.178) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Apr 2015 13:40:03 +0000 Received: by widjs5 with SMTP id js5so10973998wid.1 for ; Tue, 07 Apr 2015 06:38:12 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=message-id:date:from:reply-to:user-agent:mime-version:to:subject :references:in-reply-to:content-type:content-transfer-encoding; bh=KvBlnBS9Ge2DPOLjDKLjTePM9iQZNVagxqD0tP921h4=; b=bMhcLlZTuVwyYe2Sv0D666wbxaTZ7WUFhtMusUz/G4Fmpw8ofRbOU/iqi2JuovVGEZ hGyng/yLG7cvTUrjRFHX1vAXh8RszBn32VmuIWmbr4v76UWXk9S/glXsFlzVjy1gFOY+ OxQGE/91tLbIpXIYy4N2l5KgsuxR3x/HdRAmVLEgFIID53J1gqW5tZxJkbvPHSuNFiZv 6rbHEHyw4rQZZjS6rc1CXYICvpbxdZu03HhKOA6hAvF2DtYsKCJNF6nH/agDaaRx+zht 5GXlC9FidJ+BEHqDDsbgcZQfMVxOjteVBTI/IBZ5y4Csdn5VL26APmeVX+S27/05ebGZ nr9w== X-Received: by 10.180.86.234 with SMTP id s10mr4663915wiz.50.1428413892433; Tue, 07 Apr 2015 06:38:12 -0700 (PDT) Received: from [130.149.225.50] (catfish.dima.tu-berlin.de. [130.149.225.50]) by mx.google.com with ESMTPSA id dg8sm5093987wjc.9.2015.04.07.06.38.11 (version=TLSv1.2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Tue, 07 Apr 2015 06:38:11 -0700 (PDT) Message-ID: <5523DDC2.6090900@apache.org> Date: Tue, 07 Apr 2015 15:38:10 +0200 From: Sebastian Reply-To: ssc@apache.org User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:31.0) Gecko/20100101 Thunderbird/31.6.0 MIME-Version: 1.0 To: Till Rohrmann , user@flink.apache.org Subject: Re: Fwd: External Talk: Apache Flink - Speakers: Kostas Tzoumas (CEO dataArtisans), Stephan Ewen (CTO dataArtisans) References: <90D7C814-48F3-469A-9AE9-240800598ECB@ericsson.com> <94DF91C352A0D5419AA381FB1E90BF0B12A53B2F@ESESSMB307.ericsson.se> <792409886056ED4E9A476AEF707EF0C9441521CA@ESESSMB205.ericsson.se> <5523BFA6.3010900@apache.org> In-Reply-To: Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit X-Virus-Checked: Checked by ClamAV on apache.org For some similarity/correlation measures, it is also possible to discard candidate pairs early, if a threshold for the resulting correlation is given. This could help to fight the quadratic nature of the problem. Looking for papers on similarity search might help. -s On 07.04.2015 15:19, Till Rohrmann wrote: > I don't know whether my ideas are much better than the cartesian product > solution. As a matter of fact at some point we have to replicate the > data to be able to compute the correlations in parallel. There are > basically 3 ideas I had: > > 1. Broadcast U and V and simply compute the correlation for different > shifts in a mapper. This only works if the time series data is small > enough to be kept in memory of a task manager. > 2. Create for each shift and element a join key, join the elements and > reduce them to obtain the final result. This has a communication > complexity of (n^2+n)/2 which is asymptotically the same as the > cartesian product solution. But this solution will probably run for > arbitrarily large correlation intervals. > > So let's say we have (u1, u2, u3) and (v1, v2, v3): Then we would first > create the join keys: (1, 1, u1), (2, 1, u1), (3, 1, u1), (1, 2, u2), > (2, 2, u2), (1, 3, u3), (1, 1, v1), (1, 2, v2), (2, 1, v2), (1, 3, v3), > (2, 2, v3), (3, 1, v3). Then join on the first and second field and > compute u*v with the first field as key. Reducing on this field let's > you then compute the correlation. > > 3. Group the elements of each subinterval with respect to their shift > value and join both grouped subintervals. Then compute the correlation. > This again only works if the grouped data can be kept on the heap of the > task manager. > > On Tue, Apr 7, 2015 at 1:29 PM, Sebastian > wrote: > > How large are the individual time series? > > -s > > On 07.04.2015 12:42, Kostas Tzoumas wrote: > > Hi everyone, > > I'm forwarding a private conversation to the list with Mats' > approval. > > The problem is how to compute correlation between time series in > Flink. > We have two time series, U and V, and need to compute 1000 > correlation > measures between the series, each measure shifts one series by > one more > item: corr(U[0:N], V[n:N+n]) for n=0 to n=1000. > > Any ideas on how one can do that without a Cartesian product? > > Best, > Kostas > > ---------- Forwarded message ---------- > From: *Mats Zachrison* > >> > Date: Tue, Mar 31, 2015 at 9:21 AM > Subject: > To: Kostas Tzoumas > >>, Stefan Avesand > > >> > Cc: "stephan@data-artisans.com > > >" > > >> > > As Stefan said, what I’m trying to achieve is basically a nice > way to do > a correlation between two large time series. Since I’m looking > for an > optimal delay between the two series, I’d like to delay one of the > series x observations when doing the correlation, and step x > from 1 to > 1000.____ > > __ __ > > Some pseudo code:____ > > __ __ > > For (x = 1 to 1000)____ > > Shift Series A ‘x-1’ steps____ > > Correlation[x] = Correlate(Series A and Series B)____ > > End For____ > > __ __ > > In R, using cor() and apply(), this could look like:____ > > __ __ > > shift <- as.array(c(1:1000))____ > > corrAB <- apply(shift, 1, function(x) cor(data[x:nrow(data), > ]$ColumnA, data[1:(nrow(data) - (x - 1)), ]$ColumnB))____ > > __ __ > > __ __ > > Since this basically is 1000 independent correlation > calculations, it is > fairly easy to parallelize. Here is an R example using foreach() and > package doParallel:____ > > __ __ > > cl <- makeCluster(3)____ > > registerDoParallel(cl)____ > > corrAB <- foreach(step = c(1:1000)) %dopar% {____ > > corrAB <- cor(data[step:nrow(data), ]$ColumnA, > data[1:(nrow(data) - (step - 1)), ]$ColumnB)____ > > }____ > > stopCluster(cl)____ > > __ __ > > So I guess the question is – how to do this in a Flink > environment? Do > we have to define how to parallelize the algorithm, or can the > cluster > take care of that for us?____ > > __ __ > > And of course this is most interesting on a generic level – > given the > environment of a multi-core or –processor setup running Flink, > how hard > is it to take advantage of all the clock cycles? Do we have to > split the > algorithm, and data, and distribute the processing, or can the > system do > much of that for us?____ > > __ > > > __ __ > > __ > > >