Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8457818711 for ; Sun, 23 Aug 2015 21:07:17 +0000 (UTC) Received: (qmail 15476 invoked by uid 500); 23 Aug 2015 21:07:17 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 15396 invoked by uid 500); 23 Aug 2015 21:07:17 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 15386 invoked by uid 99); 23 Aug 2015 21:07:17 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 23 Aug 2015 21:07:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E06FDC0332 for ; Sun, 23 Aug 2015 21:07:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 4.193 X-Spam-Level: **** X-Spam-Status: No, score=4.193 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id hu_m9J5fBCNy for ; Sun, 23 Aug 2015 21:07:09 +0000 (UTC) Received: from mail-la0-f43.google.com (mail-la0-f43.google.com [209.85.215.43]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 8FBBF20FA9 for ; Sun, 23 Aug 2015 21:07:08 +0000 (UTC) Received: by lalv9 with SMTP id v9so66320886lal.0 for ; Sun, 23 Aug 2015 14:07:02 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :content-type; bh=49fKgvraPrPjKx1OmdMtqxNFZatPN2dUg0Ky2oylPHs=; b=HCJ4rvRVuC2d5Q4AGyxp6zE1QiiUjnDUun2EyGn6jOw3/bK5VnjsVgrbJlfnpYcYx0 +iRWZQW6syxTzld65DbUmtGY3U5ajhiaPm8FhJvSCA7+vEupJOrb8A99HERA/IuPlRZm JPfyy/3aCVYzfcCdDTgHJuw8gvYqXufV0gLj2HjwyZ8yKjzXYPlpPMbJOuvHS0+z6jVy lnvHcuElcuhu99TQD/GIxzzew1lvhUcUbEAxqlN+2KDQItEGe5BjyijMsXkkQpQQVyRW vmc+6Yn73sNjW22pkBntoIC8UYfogN00o6O7Yf5mlbkfXUtSLSNRW901YTVB2u2ZH92j HgWw== X-Received: by 10.112.51.133 with SMTP id k5mr16617455lbo.107.1440364022452; Sun, 23 Aug 2015 14:07:02 -0700 (PDT) MIME-Version: 1.0 References: <1440358562442-2494.post@n4.nabble.com> In-Reply-To: From: =?UTF-8?Q?Gyula_F=C3=B3ra?= Date: Sun, 23 Aug 2015 21:06:52 +0000 Message-ID: Subject: Re: Statefull computation To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11339690e7d945051e00e1df --001a11339690e7d945051e00e1df Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hey, I am not sure if I get it, why aren't the results correct? You don't instantly get the global top-k, but you are always updating it with the new local results. Gyula Aljoscha Krettek ezt =C3=ADrta (id=C5=91pont: 2015. a= ug. 23., V, 22:58): > Hi, > I wanted to post something along the same lines but now I don't think the > approach with local top-ks and merging works. For example, if you want to > get top-4 and you do the pre-processing in two parallel instances. This > input data would lead to incorrect results: > > 1. Instance: > a 6 > b 5 > c 4 > d 3 > > 2. Instance: > e 10 > f 9 > g 8 > h 7 > a 6 > b 5 > c 4 > d 3 > > So each parallel instance would forward its local top-4, which would lead > to the end result: > e 10 > f 9 > g 8 > h 7 > > Which is wrong. I think no matter how many elements you forward you can > construct cases that lead to wrong results. (The problem seems to be that > top-k is inherently global.) > > Might also be that I'm tired and not seeing this right... :D > > For the case where your elements are partitioned by some key you should b= e > fine, though, as Gyula mentioned. > > I'm not familiar with the Spark API, maybe you can help me out. What does > the updateStateByKey() do if your state is not actually partitioned by a > key. Plus, I'm curious in general what Spark does with this call. > > Cheers, > Aljoscha > > On Sun, 23 Aug 2015 at 22:28 Gyula F=C3=B3ra wrote: > >> Hey! >> >> What you are trying to do here is a global rolling aggregation, which is >> inherently a DOP 1 operation. Your observation is correct that if you wa= nt >> to use a simple stateful sink, you need to make sure that you set the >> parallelism to 1 in order to get correct results. >> >> What you can do is to keep local top-ks in a parallel operator (let's sa= y >> a flatmap) and periodically output the local top-k elements and merge th= em >> in a sink with parallelism=3D1 to produce a global top-k. >> >> I am not 100% sure how you implemented the same functionality in spark >> but there you probably achieved the semantics I described above. >> >> The whole problem is much easier if you are interested in the top-k >> elements grouped by some key, as then you can use partitioned operator >> states which will give you the correct results with arbitrary parallelis= m. >> >> Cheers, >> Gyula >> >> defstat ezt =C3=ADrta (id=C5=91pont: 2015. aug. 23.,= V, 21:40): >> >>> Hi. I am struggling the past few days to find a solution on the followi= ng >>> problem, using Apache Flink: >>> >>> I have a stream of vectors, represented by files in a local folder. >>> After a >>> new text file is located using DataStream text =3D >>> env.readFileStream(...), I transform (flatMap), the Input into a >>> SingleOutputStreamOperator, ?>, with the >>> Integer >>> being the score coming from a scoring function. >>> >>> I want to persist a global HashMap containing the top-k vectors, using >>> their >>> scores. I approached the problem using a statefull transformation. >>> 1. The first problem I have is that the HashMap retains per-sink data (= so >>> for each thread of workers, one HashMap of data). How can I make that a >>> Global collection >>> >>> 2. Using Apache Spark, I made that possible by >>> JavaPairDStream stateDstream =3D >>> tuples.updateStateByKey(updateFunction); >>> >>> and then making transformations on the stateDstream. Is there a way I c= an >>> get the same functionality using FLink? >>> >>> Thanks in advance! >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Sta= tefull-computation-tp2494.html >>> Sent from the Apache Flink User Mailing List archive. mailing list >>> archive at Nabble.com. >>> >> --001a11339690e7d945051e00e1df Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hey,=C2=A0

I am not sure if I get it, w= hy aren't the results correct?

You don't i= nstantly get the global top-k, but you are always updating it with the new = local results.

Gyula

Aljoscha Krettek <aljoscha@apache.org> ezt =C3=ADrta (id=C5=91pont: 201= 5. aug. 23., V, 22:58):
Hi,
I wanted to post something along the same lines but now I don= 't think the approach with local top-ks and merging works. For example,= if you want to get top-4 and you do the pre-processing in two parallel ins= tances. This input data would lead to incorrect results:

1. Instance:=C2=A0
a 6
b 5
c 4
d 3

2. Instance:
e 10
f 9
g 8
h 7
a 6
b 5
c 4
<= div>d 3

So each parallel instance would forward it= s local top-4, which would lead to the end result:
e 10
f 9
g 8
h 7

Which = is wrong. I think no matter how many elements you forward you can construct= cases that lead to wrong results. (The problem seems to be that top-k is i= nherently global.)

Might also be that I'm tire= d and not seeing this right... :D

For the case whe= re your elements are partitioned by some key you should be fine, though, as= Gyula mentioned.

I'm not familiar with the Sp= ark API, maybe you can help me out. What does the updateStateByKey() do if = your state is not actually partitioned by a key. Plus, I'm curious in g= eneral what Spark does with this call.=C2=A0

Cheer= s,
Aljoscha

On Sun, 23 Aug 2015 at 22:28 Gyula F=C3=B3ra <gyfora@apache.org> wrote:
Hey!

W= hat you are trying to do here is a global rolling aggregation, which is inh= erently a DOP 1 operation. Your observation is correct that if you want to = use a simple stateful sink, you need to make sure that you set the parallel= ism to 1 in order to get correct results.

What you= can do is to keep local top-ks in a parallel operator (let's say a fla= tmap) and periodically output the local top-k elements and merge them in a = sink with parallelism=3D1 to produce a global top-k.

I am not 100% sure how you implemented the same functionality in spark b= ut there you probably achieved the semantics I described above.
<= br>
The whole problem is much easier if you are interested in the= top-k elements grouped by some key, as then you can use partitioned operat= or states which will give you the correct results with arbitrary parallelis= m.

Cheers,
Gyula

defstat <defstat@gmail.com> ezt = =C3=ADrta (id=C5=91pont: 2015. aug. 23., V, 21:40):
Hi. I am struggling the past few days to find a solution o= n the following
problem, using Apache Flink:

I have a stream of vectors, represented by files in a local folder. After a=
new text file is located using DataStream<String> text =3D
env.readFileStream(...), I transform (flatMap), the Input into a
SingleOutputStreamOperator<Tuple2&lt;String, Integer>, ?>, wit= h the Integer
being the score coming from a scoring function.

I want to persist a global HashMap containing the top-k vectors, using thei= r
scores. I approached the problem using a statefull transformation.
1. The first problem I have is that the HashMap retains per-sink data (so for each thread of workers, one HashMap of data). How can I make that a
Global collection

2. Using Apache Spark, I made that possible by
JavaPairDStream<String, Integer> stateDstream =3D
tuples.updateStateByKey(updateFunction);

and then making transformations on the stateDstream. Is there a way I can get the same functionality using FLink?

Thanks in advance!



--
View this message in context: http://apache-flink-user-mailing-list-archiv= e.2336050.n4.nabble.com/Statefull-computation-tp2494.html
Sent from the Apache Flink User Mailing List archive. mailing list archive = at Nabble.com.
--001a11339690e7d945051e00e1df--