flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rinat <r.shari...@cleverdata.ru>
Subject How to lock and fait, untill checkpointing is completed
Date Mon, 30 Oct 2017 20:03:06 GMT
Hi guys, got one more question for you, maybe someone already implemented such feature or found
a good technique.

I wrote an IT, that runs a flink job, that reads data from kafka topic, and flushes it onto
fs using BucketingSink.
I implemented some custom logic, that fires on notifyCheckpointComplete and would like to
test it, so I need to lock job somehow and wait till checkpointing is performed.

The first idea, that I’ve implemented, is to specify checkpointing interval in a 1 second,
and extend logic of test source to wait for a few seconds, when all test messages will be
send to sink.
The code looks something like this:
public class SleepingCollectionInputFormat<T> extends CollectionInputFormat<T>
{

    private static final long serialVersionUID = -5957191172818298164L;

    private final long duration;

    public SleepingCollectionInputFormat(Collection<T> dataSet, TypeSerializer<T>
serializer, long duration) {
        super(dataSet, serializer);
        this.duration = duration;
    }

    @Override
    public boolean reachedEnd() throws IOException {
        try {
            boolean reached = super.reachedEnd();
            if (reached) {
                Thread.sleep(duration);
            }
            return reached;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
This is not good enough, because we couldn’t provide any guarantees, that everything is
properly locked and checkpointing is be called.
As for me, much better approach will be to send some kind of notification to task manager,
when all items from source were sent, that it’s time to perform checkpointing.
Maybe someone knows, how such feature could be implemented ?

Another one thing, that could be implemented is to use CountDownLatch, count on notify checkpointing
and await in the end of source function, but serialization makes it to complicated for me
now.

I’ll be very pleasant for your replies, answers and recommendations.

Thx ! 





Mime
View raw message