flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From simone <simone.povosca...@gmail.com>
Subject Re: Problem with Kafka Consumer
Date Tue, 16 May 2017 14:30:58 GMT
Hi Kostas,

thanks for your suggestion. Indeed, replacing my custom sink with a 
simpler one problem bring out that the cause of the problem was 
RowToQuery as you suggested. The sink was blocking the reads making the 
Kafka pipeline stall, due to a misconfiguration of an internal client 
that is calling an external service.

Thanks for your help,
Simone.


On 16/05/2017 14:01, Kostas Kloudas wrote:
> Hi Simone,
>
> I suppose that you use messageStream.keyBy(…).window(…) right? 
> .windowAll() is not applicable to keyedStreams.
>
> Some follow up questions are:
>
> In your logs, do you see any error messages?
> What does your RowToQuery() sink do? Can it be that it blocks and the 
> back pressure makes all the pipeline stall?
> To check that, you can:
> 1) check the webui for backpressure metrics
> 2) replace your sink with a dummy one that just prints whatever it 
> receives
> 3) or even put a flatmap after reading from Kafka (before the keyBy()) 
> that prints the elements before sending
> them downstream, so that you know if the consumer keeps on reading.
>
> Let us know what is the result for the previous.
>
> Thanks,
> Kostas
>
>> On May 16, 2017, at 10:44 AM, simone <simone.povoscania@gmail.com 
>> <mailto:simone.povoscania@gmail.com>> wrote:
>>
>> Hi to all,
>>
>> I have a problem with Flink and Kafka queues.
>>
>> I have a Producer that puts some Rows into a data Sink represented by 
>> a kafka queue and a Consumer that reads from this sink and process 
>> Rows in buckets of *N* elements using custom trigger function
>>
>> /messageStream.keyBy(0)//
>> //.windowAll(GlobalWindows.create())//
>> //.trigger(CountWithTimeoutTrigger.of(Time.seconds(30), *N*))//
>> //        .apply(new RowToQuery());/
>>
>> /
>> /The problem is that the Consumer, stop to consume data once reached 
>> about 1000 rows.
>> With N = 20 the consumer process 50 buckets for a total of 1000 
>> elements.
>> With N = 21 the consumer process 48 buckets for a total of 1008 elements.
>> With N = 68 the consumer process 15 buckets for a total of 1020 
>> elements. And so on...
>>
>> The same happens also without using a custom trigger function, but 
>> with simple CountTrigger function:
>>
>> /messageStream.keyBy(0)//
>> //.windowAll(GlobalWindows.create())//
>> //.trigger(PurgingTrigger.of(CountTrigger.of(//*N*//)))//
>> //         .apply(new RowToQuery());/
>>
>> How is it possible? Is there any properties on Consumer to be set in 
>> order to process more data?
>>
>> Thanks,
>>
>> Simone.
>>
>


Mime
View raw message