flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From simone <simone.povosca...@gmail.com>
Subject Re: Problem with Kafka Consumer
Date Thu, 18 May 2017 08:07:45 GMT
Hi Kostas,

As suggested I switched to version 1.3-SNAPSHOT and the project run 
without any problem. I will keep you informed if any other issue occurs. 
Thanks again for the help.

Cheers,
Simone.

On 16/05/2017 16:36, Kostas Kloudas wrote:
> Hi Simone,
>
> Glad I could help ;)
>
> Actually it would be great if you could also try out the upcoming (not 
> yet released) 1.3 version
> and let us know if you find something that does not work as expected.
>
> We are currently in the phase of testing it, as you may have noticed, 
> and every contribution to
> that front is more than welcomed.
>
> Cheers,
> Kostas
>
>> On May 16, 2017, at 4:30 PM, simone <simone.povoscania@gmail.com 
>> <mailto:simone.povoscania@gmail.com>> wrote:
>>
>> Hi Kostas,
>>
>> thanks for your suggestion. Indeed, replacing my custom sink with a 
>> simpler one problem bring out that the cause of the problem was 
>> RowToQuery as you suggested. The sink was blocking the reads making 
>> the Kafka pipeline stall, due to a misconfiguration of an internal 
>> client that is calling an external service.
>>
>> Thanks for your help,
>> Simone.
>>
>>
>> On 16/05/2017 14:01, Kostas Kloudas wrote:
>>> Hi Simone,
>>>
>>> I suppose that you use messageStream.keyBy(…).window(…) right? 
>>> .windowAll() is not applicable to keyedStreams.
>>>
>>> Some follow up questions are:
>>>
>>> In your logs, do you see any error messages?
>>> What does your RowToQuery() sink do? Can it be that it blocks and 
>>> the back pressure makes all the pipeline stall?
>>> To check that, you can:
>>> 1) check the webui for backpressure metrics
>>> 2) replace your sink with a dummy one that just prints whatever it 
>>> receives
>>> 3) or even put a flatmap after reading from Kafka (before the 
>>> keyBy()) that prints the elements before sending
>>> them downstream, so that you know if the consumer keeps on reading.
>>>
>>> Let us know what is the result for the previous.
>>>
>>> Thanks,
>>> Kostas
>>>
>>>> On May 16, 2017, at 10:44 AM, simone <simone.povoscania@gmail.com 
>>>> <mailto:simone.povoscania@gmail.com>> wrote:
>>>>
>>>> Hi to all,
>>>>
>>>> I have a problem with Flink and Kafka queues.
>>>>
>>>> I have a Producer that puts some Rows into a data Sink represented 
>>>> by a kafka queue and a Consumer that reads from this sink and 
>>>> process Rows in buckets of *N* elements using custom trigger function
>>>>
>>>> /messageStream.keyBy(0)//
>>>> //.windowAll(GlobalWindows.create())//
>>>> //.trigger(CountWithTimeoutTrigger.of(Time.seconds(30), *N*))//
>>>> //        .apply(new RowToQuery());/
>>>>
>>>> /
>>>> /The problem is that the Consumer, stop to consume data once 
>>>> reached about 1000 rows.
>>>> With N = 20 the consumer process 50 buckets for a total of 1000 
>>>> elements.
>>>> With N = 21 the consumer process 48 buckets for a total of 1008 
>>>> elements.
>>>> With N = 68 the consumer process 15 buckets for a total of 1020 
>>>> elements. And so on...
>>>>
>>>> The same happens also without using a custom trigger function, but 
>>>> with simple CountTrigger function:
>>>>
>>>> /messageStream.keyBy(0)//
>>>> //.windowAll(GlobalWindows.create())//
>>>> //.trigger(PurgingTrigger.of(CountTrigger.of(//*N*//)))//
>>>> //         .apply(new RowToQuery());/
>>>>
>>>> How is it possible? Is there any properties on Consumer to be set 
>>>> in order to process more data?
>>>>
>>>> Thanks,
>>>>
>>>> Simone.
>>>>
>>>
>>
>


Mime
View raw message