flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jary Zhen <jaryz...@gmail.com>
Subject Re: How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)
Date Mon, 25 May 2020 07:03:59 GMT
Hi, dinesh , thanks for your reply.

  For example, there are two topics, topic A produces 1 record per second
and topic B produces 3600 records per second. If I set kafka consume config
like this:
     max.poll.records: “3600"
     max.poll.interval.ms: "1000”) ,
which means I can get the whole records by every second from these two
topics in real time.
But , if  I want to consume the data from last day or earlier days by using
FlinkKafkaConsumer.setStartFromTimestamp(timestamp). I will get 3600
records within one second from *topic A* which is produce *in an hour* in
production environment, at the same time, I will get 3600 records within
one second from* topic B* which is produce *in an second. *So By using
*EventTime* semanteme , the watermark assigned from topic A  wil aways let
the data from topic B as ‘late data’ in window operator.  What I wanted is
that 1 records from A and 3600 records from B by using FlinkKafkaConsumer.
setStartFromTimestamp(timestamp) so that I can simulate consume data as in
real production environment.


Best







On Sat, 23 May 2020 at 23:42, C DINESH <dinesh.kittu99@gmail.com> wrote:

> Hi Jary,
>
> What you mean by step banlence . Could you please provide a concrete
> example
>
> On Fri, May 22, 2020 at 3:46 PM Jary Zhen <jaryzhen@gmail.com> wrote:
>
>> Hello everyone,
>>
>>    First,a brief pipeline introduction:
>>       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>       consume multi kafka topic
>>       -> union them
>>       -> assignTimestampsAndWatermarks
>>       -> keyby
>>       -> window()  and so on …
>> It's a very normal way use flink to process data like this in production
>> environment.
>> But,  If I want to test the pipeline above I need to use the api of
>> FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data.
>> So my question is how to control the ’step‘ banlence as one topic
>> produces 3 records per second while another topic produces 30000 per second.
>>
>> I don’t know if I describe clearly . so any suspicion please let me know
>>
>> Tks
>>
>>

Mime
View raw message