flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nguyen Xuan Truong <truongn...@gmail.com>
Subject Re: Flink stops deploying jobs on normal iteration
Date Thu, 07 Jul 2016 18:28:29 GMT
Hi Vasia,

You are right about the topDistance, it is the dataset which has only 1
double value. I already looked at the Aggregator and I can only get the
value of an aggregator in the next iteration. However, my problem is a bit
tricky because the topDistance controls how the newSeeds is calculated. I
managed to speed up the normal iteration a bit but still curious if there
is any workround to use the native flink iteration?

Thanks,
Truong



On Thu, Jul 7, 2016 at 10:17 AM, Vasiliki Kalavri <vasilikikalavri@gmail.com
> wrote:

> Hi Truong,
>
> I guess the problem is that you want to use topDistance as a broadcast set
> inside the iteration? If I understand correctly this is a dataset with a
> single value, right? Could you maybe compute it with an aggregator instead?
>
> -Vasia.
>
> On 5 July 2016 at 21:48, Nguyen Xuan Truong <truongnx15@gmail.com> wrote:
>
>> Hi Vasia,
>>
>> Thank you very much for your explanation :). When running with small
>> maxIteration, the job graph that Flink executed was optimal. However, when
>> maxIterations was large, Flink took very long time to generate the job
>> graph. The actually time to execute the jobs was very fast but the time to
>> optimize and schedule the jobs was slow.
>>
>> Regarding your suggestion, I didn't use iterate/iterateDelta because I
>> need to access the intermediate results within an iteration (the
>> topDistance in my pseudo-code). As you said before, Flink does not support
>> that feature, so I wondered if you have a workround for interate or
>> iterateDelta?
>>
>> Thanks,
>> Truong
>>
>> On Tue, Jul 5, 2016 at 8:46 PM, Vasiliki Kalavri <
>> vasilikikalavri@gmail.com> wrote:
>>
>>> Hi Truong,
>>>
>>> I'm afraid what you're experiencing is to be expected. Currently, for
>>> loops do not perform well in Flink since there is no support for caching
>>> intermediate results yet. This has been a quite often requested feature
>>> lately, so maybe it will be added soon :)
>>> Until then, I suggest you try implementing your logic using iterate or
>>> iterateDelta.
>>>
>>> Cheers,
>>> -Vasia.
>>>
>>> On 5 July 2016 at 17:11, Nguyen Xuan Truong <truongnx15@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a Flink program which is similar to Kmeans algorithm. I use
>>>> normal iteration(for loop) because Flink iteration does not allow to
>>>> compute the intermediate results(in this case the topDistance) within one
>>>> iteration. The problem is that my program only runs when maxIteration is
>>>> small. When the maxIterations is big, Flink jobs inside the forloop are not
>>>> scheduled, deployed or executed. The program hangs forever without any
>>>> exception, error or log message.
>>>>
>>>> I ran the program on both local and cluster environments, having the
>>>> same issue. I tried with smaller inputs (points and seeds), having the same
>>>> issue.
>>>>
>>>> Does anybody have an idea about what is the problem? (Maybe the forloop
>>>> creates many Flink jobs?)
>>>>
>>>> Here is the pseudo-code of my program:
>>>>
>>>> DataSet[Point] points = env.readTextFile(inputPoints)
>>>> DataSet[Point] seeds = env.readTextFile(inputSeeds)
>>>> discardNumber: Int = 100
>>>> maxIterations: Int = 20 // maxIteration = 30 will hang the program and
>>>> no Flink job inside the forloop jobs is deployed)
>>>>
>>>> for(iteration <- 1 to maxIterations) {
>>>>
>>>>       val intermediateSeeds = points
>>>>         .map()
>>>>         .withBroadcastSet(seeds, "seeds")
>>>>
>>>>      //topDistance contains only only double value.
>>>>       var topDistance = intermediateSeeds
>>>>         .mapPartition()
>>>>         .first(discardNumber)
>>>>         .groupBy()
>>>>         .reduceGroup()
>>>>
>>>>       val newSeeds = intermediateSeeds
>>>>         .map()
>>>>         .groupBy(0)
>>>>         .reduce ().withBroadcastSet(topDistance, "topDistance")
>>>>         .map()
>>>>
>>>>       seeds = newSeeds
>>>> }
>>>>
>>>> val finalResult = seeds.collect()
>>>>
>>>>
>>>> Thanks,
>>>> Truong
>>>>
>>>
>>>
>>
>

Mime
View raw message