flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From simone <simone.povosca...@gmail.com>
Subject Problem with Kafka Consumer
Date Tue, 16 May 2017 08:44:17 GMT
Hi to all,

I have a problem with Flink and Kafka queues.

I have a Producer that puts some Rows into a data Sink represented by a 
kafka queue and a Consumer that reads from this sink and process Rows in 
buckets of *N* elements using custom trigger function

/messageStream.keyBy(0)//
//        .windowAll(GlobalWindows.create())//
//        .trigger(CountWithTimeoutTrigger.of(Time.seconds(30), *N*))//
//        .apply(new RowToQuery());/

/
/The problem is that the Consumer, stop to consume data once reached 
about 1000 rows.
With N = 20 the consumer process 50 buckets for a total of 1000 elements.
With N = 21 the consumer process 48 buckets for a total of 1008 elements.
With N = 68 the consumer process 15 buckets for a total of 1020 
elements. And so on...

The same happens also without using a custom trigger function, but with 
simple CountTrigger function:

/messageStream.keyBy(0)//
//        .windowAll(GlobalWindows.create())//
//         .trigger(PurgingTrigger.of(CountTrigger.of(//*N*//)))//
//         .apply(new RowToQuery());/

How is it possible? Is there any properties on Consumer to be set in 
order to process more data?

Thanks,

Simone.


Mime
View raw message