Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are serialized `AmazonS3Exception`s, and you’re emitting a stream of `AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user fat jar.

Also, what is the Flink version you are using?