flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Statefull computation
Date Mon, 24 Aug 2015 06:46:13 GMT
Hi,
In the example the result is not correct because the values for a,b,c and d
are never forwarded from instance 2 even though they would modify the
global top-k result. It works, though, if you partition by the key field
(tuple field 0, in this case) before doing the summation and local top-k. I
think.

Best,
Aljoscha

On Sun, 23 Aug 2015 at 23:07 Gyula Fóra <gyula.fora@gmail.com> wrote:

> Hey,
>
> I am not sure if I get it, why aren't the results correct?
>
> You don't instantly get the global top-k, but you are always updating it
> with the new local results.
>
> Gyula
>
> Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. aug. 23.,
> V, 22:58):
>
>> Hi,
>> I wanted to post something along the same lines but now I don't think the
>> approach with local top-ks and merging works. For example, if you want to
>> get top-4 and you do the pre-processing in two parallel instances. This
>> input data would lead to incorrect results:
>>
>> 1. Instance:
>> a 6
>> b 5
>> c 4
>> d 3
>>
>> 2. Instance:
>> e 10
>> f 9
>> g 8
>> h 7
>> a 6
>> b 5
>> c 4
>> d 3
>>
>> So each parallel instance would forward its local top-4, which would lead
>> to the end result:
>> e 10
>> f 9
>> g 8
>> h 7
>>
>> Which is wrong. I think no matter how many elements you forward you can
>> construct cases that lead to wrong results. (The problem seems to be that
>> top-k is inherently global.)
>>
>> Might also be that I'm tired and not seeing this right... :D
>>
>> For the case where your elements are partitioned by some key you should
>> be fine, though, as Gyula mentioned.
>>
>> I'm not familiar with the Spark API, maybe you can help me out. What does
>> the updateStateByKey() do if your state is not actually partitioned by a
>> key. Plus, I'm curious in general what Spark does with this call.
>>
>> Cheers,
>> Aljoscha
>>
>> On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <gyfora@apache.org> wrote:
>>
>>> Hey!
>>>
>>> What you are trying to do here is a global rolling aggregation, which is
>>> inherently a DOP 1 operation. Your observation is correct that if you want
>>> to use a simple stateful sink, you need to make sure that you set the
>>> parallelism to 1 in order to get correct results.
>>>
>>> What you can do is to keep local top-ks in a parallel operator (let's
>>> say a flatmap) and periodically output the local top-k elements and merge
>>> them in a sink with parallelism=1 to produce a global top-k.
>>>
>>> I am not 100% sure how you implemented the same functionality in spark
>>> but there you probably achieved the semantics I described above.
>>>
>>> The whole problem is much easier if you are interested in the top-k
>>> elements grouped by some key, as then you can use partitioned operator
>>> states which will give you the correct results with arbitrary parallelism.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> defstat <defstat@gmail.com> ezt írta (időpont: 2015. aug. 23., V,
>>> 21:40):
>>>
>>>> Hi. I am struggling the past few days to find a solution on the
>>>> following
>>>> problem, using Apache Flink:
>>>>
>>>> I have a stream of vectors, represented by files in a local folder.
>>>> After a
>>>> new text file is located using DataStream<String> text =
>>>> env.readFileStream(...), I transform (flatMap), the Input into a
>>>> SingleOutputStreamOperator<Tuple2&lt;String, Integer>, ?>, with
the
>>>> Integer
>>>> being the score coming from a scoring function.
>>>>
>>>> I want to persist a global HashMap containing the top-k vectors, using
>>>> their
>>>> scores. I approached the problem using a statefull transformation.
>>>> 1. The first problem I have is that the HashMap retains per-sink data
>>>> (so
>>>> for each thread of workers, one HashMap of data). How can I make that a
>>>> Global collection
>>>>
>>>> 2. Using Apache Spark, I made that possible by
>>>> JavaPairDStream<String, Integer> stateDstream =
>>>> tuples.updateStateByKey(updateFunction);
>>>>
>>>> and then making transformations on the stateDstream. Is there a way I
>>>> can
>>>> get the same functionality using FLink?
>>>>
>>>> Thanks in advance!
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive at Nabble.com.
>>>>
>>>

Mime
View raw message