flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dominique De Vito <ddv36...@gmail.com>
Subject Re: about registering completion function for worker shutdown
Date Tue, 18 Feb 2020 12:34:09 GMT
Hi Robert,

Thanks for your hint / reply / help.

So far I have not tested your way (may be next), but tried another one:

* use mapPartitions
-- at the beginning, get a KafkaProducer
-- the KafkaProducerFactory class I use is lazy and caches the first
instances created; so, there is reuse.

* register a JVM hook for closing KafkaProducer.

So far I have met some perf issue, but I don't know yet it's due to my
pattern, or something else.

Anyway, thanks.

Regards,
Dominique


Le ven. 31 janv. 2020 à 14:20, Robert Metzger <rmetzger@apache.org> a
écrit :

> Hi,
>
> Flink's ProcessFunction has a close() method, which is executed on
> shutdown of the workers. (You could also use any of the Rich* functions for
> that purpose).
> If you add a ProcessFunction with the same parallelism before the
> KafkaSink, it'll be executed on the same machines as the Kafka producer.
>
> Afaik, the close() call should not take forever, as the system might
> interrupt your thread if it doesn't finish closing on time (30s is the
> default for "cluster.services.shutdown-timeout")
>
> Best,
> Robert
>
>
> On Tue, Jan 21, 2020 at 10:02 AM Dominique De Vito <ddv36a78@gmail.com>
> wrote:
>
>> Hi,
>>
>> For a Flink batch job, some value are writing to Kafka through a Producer.
>>
>> I want to register a hook for closing (at the end) the Kafka producer a
>> worker is using.... hook to be executed, of course, on worker side.
>>
>> Is there a way to do so ?
>>
>> Thanks.
>>
>> Regards,
>> Dominique
>>
>>
>>
>>

Mime
View raw message