Hi all, 

To deal with corrupted messages that can leak into the data source once in a while, we implement a custom DefaultKryoSerializer class as below that catches exceptions. The custom serializer returns null in read(...) method when it encounters exception in reading. With this implementation, the serializer may silently drop records.  One concern is that it may drop too many records before we notice and take actions. What is the best practice to handle this?  

The serializer processes one record at a time. Will reading a corrupted record make the serialize fail to process the next valid record?

public class CustomTBaseSerializer extends TBaseSerializer {
     private static final Logger LOG = LoggerFactory.getLogger(CustomTBaseSerializer.class);
     @Override
     public void write(Kryo kryoOutput outputTBase tBase) {
         try {
             super.write(kryooutputtBase);
        } catch (Throwable t) {
             LOG.error("Failed to write due to unexpected Throwable"t);
        }
    }

     @Override
     public TBase read(Kryo kryoInput inputClass<TBase> tBaseClass) {
         try {
             return super.read(kryoinputtBaseClass);
        } catch (Throwable t) {
             LOG.error("Failed to read from input due to unexpected Throwable"t);
             return null;
        }
     }
  }

Thank you!

Regards, 
-Yu