flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yu Yang <yuyan...@gmail.com>
Subject best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?
Date Sun, 31 May 2020 22:37:11 GMT
Hi all,

To deal with corrupted messages that can leak into the data source once in
a while, we implement a custom DefaultKryoSerializer class as below that
catches exceptions. The custom serializer returns null in read(...) method
when it encounters exception in reading. With this implementation, the
serializer may silently drop records.  One concern is that it may drop too
many records before we notice and take actions. What is the best practice
to handle this?

The serializer processes one record at a time. Will reading a corrupted
record make the serialize fail to process the next valid record?

public class CustomTBaseSerializer extends TBaseSerializer {
     private static final Logger LOG = LoggerFactory.getLogger
(CustomTBaseSerializer.class);
     @Override
     public void write(Kryo kryo, Output output, TBase tBase) {
         try {
             super.write(kryo, output, tBase);
        } catch (Throwable t) {
             LOG.error("Failed to write due to unexpected Throwable", t);
        }
    }

     @Override
     public TBase read(Kryo kryo, Input input, Class<TBase> tBaseClass) {
         try {
             return super.read(kryo, input, tBaseClass);
        } catch (Throwable t) {
             LOG.error("Failed to read from input due to unexpected
Throwable", t);
             return null;
        }
     }
  }

Thank you!

Regards,
-Yu

Mime
View raw message