flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: Statefull computation
Date Sun, 23 Aug 2015 21:06:52 GMT
Hey,

I am not sure if I get it, why aren't the results correct?

You don't instantly get the global top-k, but you are always updating it
with the new local results.

Gyula

Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. aug. 23.,
V, 22:58):

> Hi,
> I wanted to post something along the same lines but now I don't think the
> approach with local top-ks and merging works. For example, if you want to
> get top-4 and you do the pre-processing in two parallel instances. This
> input data would lead to incorrect results:
>
> 1. Instance:
> a 6
> b 5
> c 4
> d 3
>
> 2. Instance:
> e 10
> f 9
> g 8
> h 7
> a 6
> b 5
> c 4
> d 3
>
> So each parallel instance would forward its local top-4, which would lead
> to the end result:
> e 10
> f 9
> g 8
> h 7
>
> Which is wrong. I think no matter how many elements you forward you can
> construct cases that lead to wrong results. (The problem seems to be that
> top-k is inherently global.)
>
> Might also be that I'm tired and not seeing this right... :D
>
> For the case where your elements are partitioned by some key you should be
> fine, though, as Gyula mentioned.
>
> I'm not familiar with the Spark API, maybe you can help me out. What does
> the updateStateByKey() do if your state is not actually partitioned by a
> key. Plus, I'm curious in general what Spark does with this call.
>
> Cheers,
> Aljoscha
>
> On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <gyfora@apache.org> wrote:
>
>> Hey!
>>
>> What you are trying to do here is a global rolling aggregation, which is
>> inherently a DOP 1 operation. Your observation is correct that if you want
>> to use a simple stateful sink, you need to make sure that you set the
>> parallelism to 1 in order to get correct results.
>>
>> What you can do is to keep local top-ks in a parallel operator (let's say
>> a flatmap) and periodically output the local top-k elements and merge them
>> in a sink with parallelism=1 to produce a global top-k.
>>
>> I am not 100% sure how you implemented the same functionality in spark
>> but there you probably achieved the semantics I described above.
>>
>> The whole problem is much easier if you are interested in the top-k
>> elements grouped by some key, as then you can use partitioned operator
>> states which will give you the correct results with arbitrary parallelism.
>>
>> Cheers,
>> Gyula
>>
>> defstat <defstat@gmail.com> ezt írta (időpont: 2015. aug. 23., V, 21:40):
>>
>>> Hi. I am struggling the past few days to find a solution on the following
>>> problem, using Apache Flink:
>>>
>>> I have a stream of vectors, represented by files in a local folder.
>>> After a
>>> new text file is located using DataStream<String> text =
>>> env.readFileStream(...), I transform (flatMap), the Input into a
>>> SingleOutputStreamOperator<Tuple2&lt;String, Integer>, ?>, with
the
>>> Integer
>>> being the score coming from a scoring function.
>>>
>>> I want to persist a global HashMap containing the top-k vectors, using
>>> their
>>> scores. I approached the problem using a statefull transformation.
>>> 1. The first problem I have is that the HashMap retains per-sink data (so
>>> for each thread of workers, one HashMap of data). How can I make that a
>>> Global collection
>>>
>>> 2. Using Apache Spark, I made that possible by
>>> JavaPairDStream<String, Integer> stateDstream =
>>> tuples.updateStateByKey(updateFunction);
>>>
>>> and then making transformations on the stateDstream. Is there a way I can
>>> get the same functionality using FLink?
>>>
>>> Thanks in advance!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>

Mime
View raw message