flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maciej Bryński <mac...@brynski.pl>
Subject Re: Dead Letter Queue for JdbcSink
Date Wed, 14 Jul 2021 15:47:18 GMT
This is the idea.
Of course you need to wrap more functions like: open, close,
notifyCheckpointComplete, snapshotState, initializeState and
setRuntimeContext.

The problem is that if you want to catch problematic record you need
to set batch size to 1, which gives very bad performance.

Regards,
Maciek

śr., 14 lip 2021 o 17:31 Rion Williams <rionmonster@gmail.com> napisał(a):
>
> Hi Maciej,
>
> Thanks for the quick response. I wasn't aware of the idea of using a SinkWrapper, but
I'm not quite certain that it would suit this specific use case (as a SinkFunction / RichSinkFunction
doesn't appear to support side-outputs). Essentially, what I'd hope to accomplish would be
to pick up when a bad record could not be written to the sink and then offload that via a
side-output somewhere else.
>
> Something like this, which is a very, very naive idea:
>
> class PostgresSinkWrapper<T>(private val sink: SinkFunction<T>): RichSinkFunction<T>()
{
>     private val logger = LoggerFactory.getLogger(PostgresSinkWrapper::class.java)
>
>     override fun invoke(value: T, context: SinkFunction.Context) {
>         try {
>             sink.invoke(value, context)
>         }
>         catch (exception: Exception){
>             logger.error("Encountered a bad record, offloading to dead-letter-queue")
>             // Offload bad record to DLQ
>         }
>     }
> }
>
> But I think that's basically the gist of it. I'm just not sure how I could go about doing
this aside from perhaps writing a custom process function that wraps another sink function
(or just completely rewriting my own JdbcSink?)
>
> Thanks,
>
> Rion
>
>
>
>
>
> On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński <maciek@brynski.pl> wrote:
>>
>> Hi Rion,
>> We have implemented such a solution with Sink Wrapper.
>>
>>
>> Regards,
>> Maciek
>>
>> śr., 14 lip 2021 o 16:21 Rion Williams <rionmonster@gmail.com> napisał(a):
>> >
>> > Hi all,
>> >
>> > Recently I've been encountering an issue where some external dependencies or
process causes writes within my JDBCSink to fail (e.g. something is being inserted with an
explicit constraint that never made it's way there). I'm trying to see if there's a pattern
or recommendation for handling this similar to a dead-letter queue.
>> >
>> > Basically - if I experience a given number of failures (> max retry attempts)
when writing to my JDBC destination, I'd like to take the record that was attempted and throw
it into a Kafka topic or some other destination so that it can be evaluated at a later time.
>> >
>> > Are there any well defined patterns or recommended approaches around this?
>> >
>> > Thanks,
>> >
>> > Rion
>>
>>
>>
>> --
>> Maciek Bryński



-- 
Maciek Bryński

Mime
View raw message