flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Values are missing, probably due parallelism?
Date Thu, 21 Apr 2016 12:06:50 GMT
Hi,
no worries, I also had to read the doc to figure it out. :-)

I now see what the problem is. The .countWindowAll().apply() pattern
creates a WindowOperator with parallelism of 1 because the "count all" only
works if one instance of the window operator sees all elements. When
manually changing the parallelism it essentially becomes a "count per
parallel instance" window operation and the elements form the source with
parallelism 1 get distributed round-robin to the parallel instances of the
count-window operator. This means, that it will take more elements emitted
from the source before each instance of the window fires. It's a bit
confusing but Flink does not allow forcing the parallelism to 1 right now.

About using the snapshot version, I would suggest you don't use it if you
don't absolutely need one of the features in there that is not yet
released. The build are still pretty stable, however.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 13:53 Kostya Kulagin <kkulagin@gmail.com> wrote:

> First of all you are right about number of elements, my bad and sorry for
> the confusion, I need to be better in calculations :)
>
> However: if I change parallelism to. lets say 2 in windowing, i.e. instead
> of (of course I changed 29 to 30 as well :) )
>
> }).print();
>
> put
>
> }).setParallelism(2).print();
>
> at the very bottom - I am getting:
>
> 3> 15
> 3> 12
> 2> 9
> 2> 6
> 4> 18
> 04/21/2016 07:47:08	Sink: Unnamed(2/4) switched to FINISHED
> 04/21/2016 07:47:08	Source: Custom Source(1/1) switched to FINISHED
> 04/21/2016 07:47:08	Sink: Unnamed(4/4) switched to FINISHED
> 04/21/2016 07:47:08	Sink: Unnamed(3/4) switched to FINISHED
> 04/21/2016 07:47:08	TriggerWindow(GlobalWindows(), PurgingTrigger(CountTrigger(10)),
AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to FINISHED
> 04/21/2016 07:47:08	TriggerWindow(GlobalWindows(), PurgingTrigger(CountTrigger(10)),
AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to FINISHED
> 1> 3
> 1> 0
>
> With default setting for parallelism it works fine, same as with value 3
> and 1.
>
> With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it
> might be smth with how threads are finishing their execution?
>
> I am using the latest prod version I've found in maven: 1.0.1.
> Can snapshot versions be used in prod? I mean how well tested are those?
>
> I will try the same on master branch later today.
>
> Thanks!
> Kostya
>
>
> On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> Hi,
>> which version of Flink are you using? Maybe there is a bug. I've tested
>> it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees
>> of parallelism if I change the source to emit 30 elements:
>> LongStream.range(0, 30).forEach(ctx::collect);
>>
>> (The second argument of LongStream.range(start, end) is exclusive)
>>
>> Cheers,
>> Aljoscha
>>
>>
>>
>> On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin <kkulagin@gmail.com> wrote:
>>
>>> Actually this is not true - the source emits 30 values since it is
>>> started with 0. If I change 29 to 33 result will be the same.
>>> I can get all values if I play with parallelism. I.e putting parallel 1
>>> before print.
>>> Or if I change 29 to 39 ( I have 4 cors)
>>> I can guess that there is smth wrong with threads. BTW in this case how
>>> threads are created and how data flows between?
>>> On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <aljoscha@apache.org> wrote:
>>>
>>>> Hi,
>>>> this is related to your other question about count windows. The source
>>>> emits 29 values so we only have two count-windows with 10 elements each.
>>>> The last window is never triggered.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <kkulagin@gmail.com> wrote:
>>>>
>>>>> I think it has smth to do with parallelism and I probably do not have
>>>>> clear understanding how parallelism works in flink but in this example:
>>>>>
>>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>()
{
>>>>>
>>>>>       @Override
>>>>>       public void run(SourceContext<Long> ctx) throws Exception
{
>>>>>         LongStream.range(0, 29).forEach(ctx::collect);
>>>>>       }
>>>>>
>>>>>       @Override
>>>>>       public void cancel() {
>>>>>
>>>>>       }
>>>>>     });
>>>>>
>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long,
GlobalWindow>() {
>>>>>       @Override
>>>>>       public void apply(GlobalWindow window, Iterable<Long> values,
Collector<Long> out) throws Exception {
>>>>>         for (Long value : values) {
>>>>>           if (value % 3 == 0) {
>>>>>             out.collect(value);
>>>>>           }
>>>>>         }
>>>>>       }
>>>>>     }).print();
>>>>>
>>>>>     env.execute("yoyoyo");
>>>>>
>>>>> Why my output is like this:
>>>>>
>>>>> 4> 9
>>>>> 1> 0
>>>>> 1> 12
>>>>> 3> 6
>>>>> 3> 18
>>>>> 2> 3
>>>>> 2> 15
>>>>>
>>>>> ? I.e. where id s value of 24 for example? I expect to see it. What am
>>>>> I doing wrong?
>>>>>
>>>>
>

Mime
View raw message